diff --git a/include/depthai/device/DeviceBase.hpp b/include/depthai/device/DeviceBase.hpp index a2eac9caf..eaf7adda6 100644 --- a/include/depthai/device/DeviceBase.hpp +++ b/include/depthai/device/DeviceBase.hpp @@ -920,8 +920,7 @@ class DeviceBase { std::chrono::steady_clock::time_point lastWatchdogPingTime; // closed - mutable std::mutex closedMtx; - bool closed{false}; + std::atomic closed{false}; // pimpl class Impl; diff --git a/include/depthai/xlink/XLinkConnection.hpp b/include/depthai/xlink/XLinkConnection.hpp index 1696e164a..484f8d2ec 100644 --- a/include/depthai/xlink/XLinkConnection.hpp +++ b/include/depthai/xlink/XLinkConnection.hpp @@ -37,8 +37,8 @@ struct DeviceInfo { std::string getMxId() const; std::string toString() const; - std::string name = ""; - std::string mxid = ""; + std::string name; + std::string mxid; XLinkDeviceState_t state = X_LINK_ANY_STATE; XLinkProtocol_t protocol = X_LINK_ANY_PROTOCOL; XLinkPlatform_t platform = X_LINK_ANY_PLATFORM; @@ -148,8 +148,7 @@ class XLinkConnection { DeviceInfo deviceInfo; // closed - mutable std::mutex closedMtx; - bool closed{false}; + std::atomic closed{false}; constexpr static std::chrono::milliseconds WAIT_FOR_BOOTUP_TIMEOUT{15000}; constexpr static std::chrono::milliseconds WAIT_FOR_CONNECT_TIMEOUT{5000}; diff --git a/include/depthai/xlink/XLinkStream.hpp b/include/depthai/xlink/XLinkStream.hpp index 47bc9ace2..c6f3fd1f0 100644 --- a/include/depthai/xlink/XLinkStream.hpp +++ b/include/depthai/xlink/XLinkStream.hpp @@ -43,11 +43,11 @@ class XLinkStream { streamId_t streamId{INVALID_STREAM_ID}; public: - XLinkStream(const std::shared_ptr conn, const std::string& name, std::size_t maxWriteSize); + XLinkStream(const std::shared_ptr& conn, const std::string& name, std::size_t maxWriteSize); XLinkStream(const XLinkStream&) = delete; - XLinkStream(XLinkStream&& stream); + XLinkStream(XLinkStream&& stream) noexcept; XLinkStream& operator=(const XLinkStream&) = delete; - XLinkStream& operator=(XLinkStream&& stream); + XLinkStream& operator=(XLinkStream&& stream) noexcept; ~XLinkStream(); // Blocking @@ -89,11 +89,11 @@ struct XLinkError : public std::runtime_error { : runtime_error(message), status(statusID), streamName(std::move(stream)) {} }; struct XLinkReadError : public XLinkError { - using XLinkError = XLinkError; + using XLinkError::XLinkError; XLinkReadError(XLinkError_t status, const std::string& stream); }; struct XLinkWriteError : public XLinkError { - using XLinkError = XLinkError; + using XLinkError::XLinkError; XLinkWriteError(XLinkError_t status, const std::string& stream); }; diff --git a/src/device/DeviceBase.cpp b/src/device/DeviceBase.cpp index 7242bd9e6..b8bdc7b61 100644 --- a/src/device/DeviceBase.cpp +++ b/src/device/DeviceBase.cpp @@ -281,8 +281,6 @@ class DeviceBase::Impl { DeviceLogger logger{"host", stdoutColorSink}; // RPC - std::mutex rpcMutex; - std::shared_ptr rpcStream; std::unique_ptr> rpcClient; void setLogLevel(LogLevel level); @@ -480,11 +478,10 @@ DeviceBase::DeviceBase(Config config, const DeviceInfo& devInfo) : deviceInfo(de } void DeviceBase::close() { - std::unique_lock lock(closedMtx); - if(!closed) { - closeImpl(); - closed = true; - } + // Only allow to close once + if(closed.exchange(true)) return; + + closeImpl(); } void DeviceBase::closeImpl() { @@ -516,10 +513,6 @@ void DeviceBase::closeImpl() { // At the end stop the monitor thread if(monitorThread.joinable()) monitorThread.join(); - // Close rpcStream - pimpl->rpcStream = nullptr; - pimpl->rpcClient = nullptr; - pimpl->logger.debug("Device closed, {}", duration_cast(steady_clock::now() - t1).count()); } @@ -527,7 +520,6 @@ void DeviceBase::closeImpl() { // is invalidated during the return by value and continues to degrade in // validity to the caller bool DeviceBase::isClosed() const { - std::unique_lock lock(closedMtx); return closed || !watchdogRunning; } @@ -542,8 +534,10 @@ void DeviceBase::tryStartPipeline(const Pipeline& pipeline) { } } catch(const std::exception&) { // close device (cleanup) + // can throw within itself, e.g. from the rpcclient lambda with an xlink error close(); - // Rethrow original exception + + // Rethrow original exception when close() itself doesn't throw throw; } } @@ -673,7 +667,7 @@ void DeviceBase::init2(Config cfg, const dai::Path& pathToMvcmd, tl::optionallogger.debug("Device - BoardConfig: {} \nlibnop:{}", jBoardConfig.dump(), spdlog::to_hex(utility::serialize(config.board))); } @@ -726,32 +720,32 @@ void DeviceBase::init2(Config cfg, const dai::Path& pathToMvcmd, tl::optionalrpcStream = std::make_shared(connection, device::XLINK_CHANNEL_MAIN_RPC, device::XLINK_USB_BUFFER_MAX_SIZE); - auto rpcStream = pimpl->rpcStream; - - pimpl->rpcClient = std::make_unique>([this, rpcStream](nanorpc::core::type::buffer request) { - // Lock for time of the RPC call, to not mix the responses between calling threads. - // Note: might cause issues on Windows on incorrect shutdown. To be investigated - std::unique_lock lock(pimpl->rpcMutex); - - // Log the request data - if(logger::get_level() == spdlog::level::trace) { - pimpl->logger.trace("RPC: {}", nlohmann::json::from_msgpack(request).dump()); - } + pimpl->rpcClient = std::make_unique>( + [rpcMutex = std::make_shared(), + rpcStream = std::make_shared(connection, device::XLINK_CHANNEL_MAIN_RPC, device::XLINK_USB_BUFFER_MAX_SIZE), + &implLogger = this->pimpl->logger](nanorpc::core::type::buffer request) { + // Lock for time of the RPC call, to not mix the responses between calling threads. + // Note: might cause issues on Windows on incorrect shutdown. To be investigated + std::lock_guard lock(*rpcMutex); + + // Log the request data + if(logger::get_level() == spdlog::level::trace) { + implLogger.trace("RPC: {}", nlohmann::json::from_msgpack(request).dump()); + } - try { - // Send request to device - rpcStream->write(std::move(request)); - - // Receive response back - // Send to nanorpc to parse - return rpcStream->read(); - } catch(const std::exception& e) { - // If any exception is thrown, log it and rethrow - pimpl->logger.debug("RPC error: {}", e.what()); - throw std::system_error(std::make_error_code(std::errc::io_error), "Device already closed or disconnected"); - } - }); + try { + // Send request to device + rpcStream->write(request); + + // Receive response back + // Send to nanorpc to parse + return rpcStream->read(); + } catch(const std::exception& e) { + // If any exception is thrown, log it and rethrow + implLogger.debug("RPC error: {}", e.what()); + throw std::system_error(std::make_error_code(std::errc::io_error), "Device already closed or disconnected"); + } + }); // prepare watchdog thread, which will keep device alive // separate stream so it doesn't miss between potentially long RPC calls @@ -912,12 +906,8 @@ void DeviceBase::init2(Config cfg, const dai::Path& pathToMvcmd, tl::optionallogger.debug("Profiling write speed: {:.2f} MiB/s, read speed: {:.2f} MiB/s, total written: {:.2f} MiB, read: {:.2f} MiB", w / 1024.0f / 1024.0f, @@ -925,6 +915,7 @@ void DeviceBase::init2(Config cfg, const dai::Path& pathToMvcmd, tl::optional(1) / rate); } } catch(const std::exception& ex) { @@ -1383,6 +1374,7 @@ bool DeviceBase::startPipelineImpl(const Pipeline& pipeline) { const std::string streamAssetStorage = "__stream_asset_storage"; std::thread t1([this, &streamAssetStorage, &assetStorage]() { XLinkStream stream(connection, streamAssetStorage, device::XLINK_USB_BUFFER_MAX_SIZE); + // TODO replace this code with XLinkStream::writeSplit() int64_t offset = 0; do { int64_t toTransfer = std::min(static_cast(device::XLINK_USB_BUFFER_MAX_SIZE), static_cast(assetStorage.size() - offset)); @@ -1395,9 +1387,6 @@ bool DeviceBase::startPipelineImpl(const Pipeline& pipeline) { t1.join(); } - // print assets on device side for test - pimpl->rpcClient->call("printAssets"); - // Build and start the pipeline bool success = false; std::string errorMsg; @@ -1406,7 +1395,6 @@ bool DeviceBase::startPipelineImpl(const Pipeline& pipeline) { pimpl->rpcClient->call("startPipeline"); } else { throw std::runtime_error(errorMsg); - return false; } return true; diff --git a/src/device/DeviceBootloader.cpp b/src/device/DeviceBootloader.cpp index ad33db4fd..833de72bf 100644 --- a/src/device/DeviceBootloader.cpp +++ b/src/device/DeviceBootloader.cpp @@ -220,7 +220,7 @@ std::vector DeviceBootloader::createDepthaiApplicationPackage( for(std::size_t i = 0; i < assetStorage.size(); i++) fwPackage[assetStorageSection->offset + i] = assetStorage[i]; // Debug - if(logger::get_level() == spdlog::level::debug) { + if(logger::get_level() <= spdlog::level::debug) { SBR_SECTION* cur = &sbr.sections[0]; logger::debug("DepthAI Application Package"); for(; cur != lastSection + 1; cur++) { diff --git a/src/pipeline/datatype/StreamMessageParser.cpp b/src/pipeline/datatype/StreamMessageParser.cpp index 63d7c4bda..8be22a464 100644 --- a/src/pipeline/datatype/StreamMessageParser.cpp +++ b/src/pipeline/datatype/StreamMessageParser.cpp @@ -207,7 +207,7 @@ std::shared_ptr StreamMessageParser::parseMessageToADatatype(streamPa // RawBuffer is special case, no metadata is actually serialized auto pBuf = std::make_shared(); pBuf->data = std::move(data); - return std::make_shared(pBuf); + return std::make_shared(std::move(pBuf)); } break; case DatatypeEnum::ImgFrame: diff --git a/src/utility/Logging.hpp b/src/utility/Logging.hpp index efc721588..b3ed5f329 100644 --- a/src/utility/Logging.hpp +++ b/src/utility/Logging.hpp @@ -26,7 +26,7 @@ namespace dai { class Logging { // private constructor Logging(); - ~Logging() {} + ~Logging() = default; public: static Logging& getInstance() { @@ -38,7 +38,7 @@ class Logging { // Public API spdlog::logger logger; - spdlog::level::level_enum parseLevel(std::string lvl); + static spdlog::level::level_enum parseLevel(std::string lvl); }; diff --git a/src/utility/PimplImpl.hpp b/src/utility/PimplImpl.hpp index 9b2204585..c6feda780 100644 --- a/src/utility/PimplImpl.hpp +++ b/src/utility/PimplImpl.hpp @@ -14,7 +14,7 @@ Pimpl::Pimpl( Args&& ...args ) : m{ new T{ std::forward(args)... } } { } template -Pimpl::~Pimpl() { } +Pimpl::~Pimpl() = default; template T* Pimpl::operator->() { return m.get(); } diff --git a/src/utility/XLinkGlobalProfilingLogger.cpp b/src/utility/XLinkGlobalProfilingLogger.cpp index 7de1de442..0e9181989 100644 --- a/src/utility/XLinkGlobalProfilingLogger.cpp +++ b/src/utility/XLinkGlobalProfilingLogger.cpp @@ -25,10 +25,9 @@ void XLinkGlobalProfilingLogger::enable(bool en) { XLinkProf_t prof; XLinkGetGlobalProfilingData(&prof); - long long w = prof.totalWriteBytes - lastProf.totalWriteBytes; - long long r = prof.totalReadBytes - lastProf.totalReadBytes; - w /= rate.load(); - r /= rate.load(); + const auto rateSnapshot = rate.load(); + const float w = (prof.totalWriteBytes - lastProf.totalWriteBytes) / rateSnapshot; + const float r = (prof.totalReadBytes - lastProf.totalReadBytes) / rateSnapshot; logger::debug("Profiling global write speed: {:.2f} MiB/s, read speed: {:.2f} MiB/s, total written: {:.2f} MiB, read: {:.2f} MiB", w / 1024.0f / 1024.0f, @@ -36,8 +35,8 @@ void XLinkGlobalProfilingLogger::enable(bool en) { prof.totalWriteBytes / 1024.0f / 1024.0f, prof.totalReadBytes / 1024.0f / 1024.0f); - lastProf = prof; - this_thread::sleep_for(duration(1) / rate.load()); + lastProf = std::move(prof); + this_thread::sleep_for(duration(1) / rateSnapshot); } }); } diff --git a/src/xlink/XLinkConnection.cpp b/src/xlink/XLinkConnection.cpp index 8d44ea7dc..c7c6a765d 100644 --- a/src/xlink/XLinkConnection.cpp +++ b/src/xlink/XLinkConnection.cpp @@ -25,16 +25,13 @@ extern "C" { #include "spdlog/spdlog.h" #include "utility/Logging.hpp" +using namespace std::chrono_literals; +using std::chrono::steady_clock; + namespace dai { -DeviceInfo::DeviceInfo(const deviceDesc_t& desc) { - name = std::string(desc.name); - mxid = std::string(desc.mxid); - state = desc.state; - protocol = desc.protocol; - platform = desc.platform; - status = desc.status; -} +DeviceInfo::DeviceInfo(const deviceDesc_t& desc) + : name{desc.name}, mxid{desc.mxid}, state{desc.state}, protocol{desc.protocol}, platform{desc.platform}, status{desc.status} {} DeviceInfo::DeviceInfo(std::string name, std::string mxid, XLinkDeviceState_t state, XLinkProtocol_t protocol, XLinkPlatform_t platform, XLinkError_t status) : name(std::move(name)), mxid(std::move(mxid)), state(state), protocol(protocol), platform(platform), status(status) {} @@ -42,7 +39,7 @@ DeviceInfo::DeviceInfo(std::string name, std::string mxid, XLinkDeviceState_t st DeviceInfo::DeviceInfo(std::string mxidOrName) { // Parse parameter and set to ip if any dots found // mxid doesn't have a dot in the name - if(mxidOrName.find(".") != std::string::npos) { + if(mxidOrName.find('.') != std::string::npos) { // This is reasoned as an IP address or USB path (name). Set rest of info accordingly name = std::move(mxidOrName); mxid = ""; @@ -105,7 +102,7 @@ static XLinkProtocol_t getDefaultProtocol() { return defaultProtocol; } -// STATIC +// STATIC definitions constexpr std::chrono::milliseconds XLinkConnection::WAIT_FOR_BOOTUP_TIMEOUT; constexpr std::chrono::milliseconds XLinkConnection::WAIT_FOR_CONNECT_TIMEOUT; constexpr std::chrono::milliseconds XLinkConnection::POLLING_DELAY_TIME; @@ -189,7 +186,7 @@ std::tuple XLinkConnection::getDeviceByMxId(std::string mxId, initialize(); DeviceInfo dev; - dev.mxid = mxId; + dev.mxid = std::move(mxId); dev.state = state; deviceDesc_t desc = {}; @@ -216,8 +213,6 @@ std::tuple XLinkConnection::getDeviceByMxId(std::string mxId, DeviceInfo XLinkConnection::bootBootloader(const DeviceInfo& deviceInfo) { initialize(); - using namespace std::chrono; - // Device is in flash booted state. Boot to bootloader first auto deviceDesc = deviceInfo.getXLinkDeviceDesc(); @@ -240,7 +235,6 @@ DeviceInfo XLinkConnection::bootBootloader(const DeviceInfo& deviceInfo) { deviceDesc_t foundDeviceDesc = {}; // Wait for device to get to bootloader state - XLinkError_t rc; auto bootupTimeout = WAIT_FOR_BOOTUP_TIMEOUT; // Override with environment variables, if set @@ -248,22 +242,22 @@ DeviceInfo XLinkConnection::bootBootloader(const DeviceInfo& deviceInfo) { {"DEPTHAI_BOOTUP_TIMEOUT", &bootupTimeout}, }; - for(auto ev : evars) { - auto name = ev.first; - auto valstr = utility::getEnv(name); + for(const auto& ev : evars) { + const auto valstr = utility::getEnv(ev.first); if(!valstr.empty()) { try { std::chrono::milliseconds value{std::stoi(valstr)}; *ev.second = value; // auto initial = *ev.second; - // logger::warn("{} override: {} -> {}", name, initial, value); + // logger::warn("{} override: {} -> {}", ev.first, initial, value); } catch(const std::invalid_argument& e) { - logger::warn("{} value invalid: {}", name, e.what()); + logger::warn("{} value invalid: {}", ev.first, e.what()); } } } auto tstart = steady_clock::now(); + XLinkError_t rc = X_LINK_ERROR; do { rc = XLinkFindFirstSuitableDevice(descToWait, &foundDeviceDesc); if(rc == X_LINK_SUCCESS) break; @@ -303,24 +297,21 @@ XLinkConnection::XLinkConnection(const DeviceInfo& deviceDesc, XLinkDeviceState_ // within the context of the lock_guard. The value is immediately invalid and outdated // when it is returned by value to the caller bool XLinkConnection::isClosed() const { - std::lock_guard lock(closedMtx); return closed; } void XLinkConnection::close() { - std::lock_guard lock(closedMtx); - if(closed) return; + if(closed.exchange(true)) return; - using namespace std::chrono; - constexpr auto RESET_TIMEOUT = seconds(2); - constexpr auto BOOTUP_SEARCH = seconds(5); + constexpr auto RESET_TIMEOUT = 2s; + constexpr auto BOOTUP_SEARCH = 5s; // majority of usb reboots on Windows need more than 5 seconds; this leads to a cascade failure if(deviceLinkId != -1 && rebootOnDestruction) { auto previousLinkId = deviceLinkId; - auto ret = XLinkResetRemoteTimeout(deviceLinkId, duration_cast(RESET_TIMEOUT).count()); + const auto ret = XLinkResetRemoteTimeout(deviceLinkId, static_cast(std::chrono::duration_cast(RESET_TIMEOUT).count())); if(ret != X_LINK_SUCCESS) { - logger::debug("XLinkResetRemoteTimeout returned: {}", XLinkErrorToStr(ret)); + logger::debug("XLinkResetRemoteTimeout({}) returned: {}", previousLinkId, XLinkErrorToStr(ret)); } deviceLinkId = -1; @@ -330,23 +321,31 @@ void XLinkConnection::close() { // Wait till same device reappears (is rebooted). // Only in case if device was booted to begin with if(bootDevice) { - auto t1 = steady_clock::now(); - bool found = false; - do { + const auto t1 = steady_clock::now(); + while(true) { + bool found = false; DeviceInfo rebootingDeviceInfo; std::tie(found, rebootingDeviceInfo) = XLinkConnection::getDeviceByMxId(deviceInfo.getMxId(), X_LINK_ANY_STATE, false); - if(found) { - if(rebootingDeviceInfo.state == X_LINK_UNBOOTED || rebootingDeviceInfo.state == X_LINK_BOOTLOADER) { - break; + if(found && (rebootingDeviceInfo.state == X_LINK_UNBOOTED || rebootingDeviceInfo.state == X_LINK_BOOTLOADER)) { + break; + } + if(steady_clock::now() - t1 >= BOOTUP_SEARCH) { + if(found) { + logger::debug("XLinkResetRemoteTimeout({}) post-reboot({}s) unusable state {}", + previousLinkId, + BOOTUP_SEARCH.count(), + rebootingDeviceInfo.toString()); + } else { + logger::debug("XLinkResetRemoteTimeout({}) post-reboot({}s) can't find device", previousLinkId, BOOTUP_SEARCH.count()); } + break; } - } while(!found && steady_clock::now() - t1 < BOOTUP_SEARCH); + std::this_thread::sleep_for(POLLING_DELAY_TIME); + }; } - logger::debug("XLinkResetRemote of linkId: ({})", previousLinkId); + logger::debug("XLinkResetRemoteTimeout({})", previousLinkId); } - - closed = true; } XLinkConnection::~XLinkConnection() { @@ -378,8 +377,6 @@ void XLinkConnection::initDevice(const DeviceInfo& deviceToInit, XLinkDeviceStat XLinkError_t rc = X_LINK_ERROR; - using namespace std::chrono; - // if device is in UNBOOTED then boot bootDevice = deviceToInit.state == X_LINK_UNBOOTED; @@ -394,16 +391,15 @@ void XLinkConnection::initDevice(const DeviceInfo& deviceToInit, XLinkDeviceStat {"DEPTHAI_BOOTUP_TIMEOUT", &bootupTimeout}, }; - for(auto ev : evars) { - auto name = ev.first; - auto valstr = utility::getEnv(name); + for(const auto& ev : evars) { + const auto valstr = utility::getEnv(ev.first); if(!valstr.empty()) { try { std::chrono::milliseconds value{std::stoi(valstr)}; *ev.second = value; // auto initial = *ev.second; } catch(const std::invalid_argument& e) { - logger::warn("{} value invalid: {}", name, e.what()); + logger::warn("{} value invalid: {}", ev.first, e.what()); } } } @@ -430,14 +426,10 @@ void XLinkConnection::initDevice(const DeviceInfo& deviceToInit, XLinkDeviceStat lastDeviceInfo = DeviceInfo(foundDeviceDesc); - bool bootStatus; - if(bootWithPath) { - bootStatus = bootAvailableDevice(foundDeviceDesc, pathToMvcmd); - } else { - bootStatus = bootAvailableDevice(foundDeviceDesc, mvcmd); - } + const bool bootStatus = bootWithPath ? bootAvailableDevice(foundDeviceDesc, pathToMvcmd) : bootAvailableDevice(foundDeviceDesc, mvcmd); if(!bootStatus) { - throw std::runtime_error("Failed to boot device!"); + // this throw will cause an app to crash if not caught by a try/catch around the app's `Device` constructor + throw std::runtime_error("Failed to begin the boot of a device!"); } } @@ -453,7 +445,7 @@ void XLinkConnection::initDevice(const DeviceInfo& deviceToInit, XLinkDeviceStat // Use "name" as hint only, but might still change bootedDescInfo.nameHintOnly = true; - logger::debug("Searching for booted device: {}, name used as hint only", bootedDeviceInfo.toString()); + logger::debug("Searching for booted device: {}, name/path used as hint only", bootedDeviceInfo.toString()); // Find booted device deviceDesc_t foundDeviceDesc = {}; diff --git a/src/xlink/XLinkStream.cpp b/src/xlink/XLinkStream.cpp index fa9393dee..f6bd1c67d 100644 --- a/src/xlink/XLinkStream.cpp +++ b/src/xlink/XLinkStream.cpp @@ -13,7 +13,7 @@ namespace dai { constexpr std::chrono::milliseconds XLinkStream::WAIT_FOR_STREAM_RETRY; constexpr int XLinkStream::STREAM_OPEN_RETRIES; -XLinkStream::XLinkStream(const std::shared_ptr conn, const std::string& name, std::size_t maxWriteSize) : connection(conn), streamName(name) { +XLinkStream::XLinkStream(const std::shared_ptr& conn, const std::string& name, std::size_t maxWriteSize) : connection(conn), streamName(name) { if(name.empty()) throw std::invalid_argument("Cannot create XLinkStream using empty stream name"); if(!connection || connection->getLinkId() == -1) throw std::invalid_argument("Cannot create XLinkStream using unconnected XLinkConnection"); @@ -33,12 +33,12 @@ XLinkStream::XLinkStream(const std::shared_ptr conn, const std: } // Move constructor -XLinkStream::XLinkStream(XLinkStream&& other) +XLinkStream::XLinkStream(XLinkStream&& other) noexcept : connection(std::move(other.connection)), streamName(std::exchange(other.streamName, {})), streamId(std::exchange(other.streamId, INVALID_STREAM_ID)) { // Set other's streamId to INVALID_STREAM_ID to prevent closing } -XLinkStream& XLinkStream::operator=(XLinkStream&& other) { +XLinkStream& XLinkStream::operator=(XLinkStream&& other) noexcept { if(this != &other) { connection = std::move(other.connection); streamId = std::exchange(other.streamId, INVALID_STREAM_ID); diff --git a/tests/src/multiple_devices_test.cpp b/tests/src/multiple_devices_test.cpp index d65dcae65..eb258905d 100644 --- a/tests/src/multiple_devices_test.cpp +++ b/tests/src/multiple_devices_test.cpp @@ -1,3 +1,6 @@ +#define CATCH_CONFIG_MAIN +#include + #include #include #include @@ -10,26 +13,31 @@ using namespace std; using namespace std::chrono; using namespace std::chrono_literals; -constexpr auto NUM_MESSAGES = 50; -constexpr auto TEST_TIMEOUT = 20s; +size_t maxFoundDevices = 0; +constexpr auto DEVICE_SEARCH_TIMEOUT = 10s; -int main() { +TEST_CASE("Multiple devices with 50 messages each") { + constexpr auto NUM_MESSAGES = 50; + constexpr auto TEST_TIMEOUT = 20s; mutex mtx; vector threads; vector, int>> devices; int deviceCounter = 0; - // Wait for 3s to acquire more than 1 device. Otherwise perform the test with 1 device + // Wait to acquire more than 1 device. Otherwise perform the test with 1 device // Could also fail instead - but requires test groups before auto t1 = steady_clock::now(); vector availableDevices; do { availableDevices = dai::Device::getAllAvailableDevices(); this_thread::sleep_for(500ms); - } while(availableDevices.size() <= 1 && steady_clock::now() - t1 <= 3s); + } while((availableDevices.size() < 2 || availableDevices.size() < maxFoundDevices) && (steady_clock::now() - t1 <= DEVICE_SEARCH_TIMEOUT)); + REQUIRE(!availableDevices.empty()); + REQUIRE(availableDevices.size() >= maxFoundDevices); + maxFoundDevices = availableDevices.size(); for(const auto& dev : availableDevices) { - threads.push_back(thread([&mtx, &devices, dev, deviceCounter]() { + threads.emplace_back([&mtx, &devices, dev, deviceCounter]() { // Create pipeline dai::Pipeline pipeline; @@ -54,7 +62,6 @@ int main() { std::ignore = deviceCounter; auto device = make_shared(pipeline, dev, dai::UsbSpeed::SUPER); - device->getOutputQueue("rgb", 4, false); cout << "MXID: " << device->getMxId() << endl; cout << "Connected cameras: "; @@ -64,8 +71,8 @@ int main() { cout << endl; unique_lock l(mtx); - devices.push_back({device, 0}); - })); + devices.emplace_back(device, 0); + }); deviceCounter++; } @@ -75,29 +82,199 @@ int main() { thread.join(); } - bool finished = false; + bool finished = devices.empty(); t1 = steady_clock::now(); + while (!finished && steady_clock::now() - t1 < TEST_TIMEOUT) { + finished = true; + for(auto& devCounter : devices) { + auto& dev = get<0>(devCounter); + auto& counter = get<1>(devCounter); + if(dev->getOutputQueue("rgb")->tryGet()) { + cout << "Device " << dev->getMxId() << " message arrived (" << ++counter << "/" << NUM_MESSAGES << ")" << endl; + } + + if(counter < NUM_MESSAGES) { + finished = false; + } + } + std::this_thread::sleep_for(1ms); + } + + REQUIRE(finished); +} + +TEST_CASE("Multiple devices created and destroyed in parallel", "[.][multi_test_devices]") { + constexpr auto TEST_TIMEOUT = 30s; + constexpr auto COLOR_FPS = 30; + constexpr auto MONO_FPS = 30; + constexpr auto COLOR_THRESHOLD = 0; + constexpr auto DEPTH_THRESHOLD = 0; + vector threads; + + // Wait to acquire devices. Require 2+ devices. + const auto t1 = steady_clock::now(); + vector availableDevices; do { - { - std::unique_lock l(mtx); - - finished = devices.size() > 0; - for(auto& devCounter : devices) { - auto& dev = get<0>(devCounter); - auto& counter = get<1>(devCounter); - if(dev->getOutputQueue("rgb")->tryGet()) { - cout << "Device " << dev->getMxId() << " message arrived (" << counter + 1 << "/" << NUM_MESSAGES << ")\n"; - counter++; + availableDevices = dai::Device::getAllAvailableDevices(); + this_thread::sleep_for(500ms); + } while((availableDevices.size() < 2 || availableDevices.size() < maxFoundDevices) && (steady_clock::now() - t1 <= DEVICE_SEARCH_TIMEOUT)); + REQUIRE(availableDevices.size() >= 2); + REQUIRE(availableDevices.size() >= maxFoundDevices); + maxFoundDevices = availableDevices.size(); + cout << "Found device count: " << availableDevices.size() << endl; + + // preallocate to avoid reallocation/move + threads.reserve(availableDevices.size()); + + // create a separate thread for every hardware device that configures color and depth nodes, + // starts the device, and creates a simple callback for these two data streams + for(const auto& selectedDevice : availableDevices) { + threads.emplace_back([=, &selectedDevice]() mutable { + while(steady_clock::now() - t1 < TEST_TIMEOUT) { + // Create pipeline + dai::Pipeline pipeline; + + // Define color source and output + auto camColor = pipeline.create(); + auto xoutColor = pipeline.create(); + xoutColor->setStreamName("color"); + camColor->video.link(xoutColor->input); + + // Color properties + camColor->setBoardSocket(dai::CameraBoardSocket::RGB); + camColor->setResolution(dai::ColorCameraProperties::SensorResolution::THE_1080_P); + camColor->setFps(static_cast(COLOR_FPS)); + + // Define stereo depth source and output + auto monoLeft = pipeline.create(); + auto monoRight = pipeline.create(); + auto stereo = pipeline.create(); + auto xoutDepth = pipeline.create(); + monoLeft->out.link(stereo->left); + monoRight->out.link(stereo->right); + xoutDepth->setStreamName("depth"); + stereo->depth.link(xoutDepth->input); + + // Depth properties + monoLeft->setResolution(dai::MonoCameraProperties::SensorResolution::THE_400_P); + monoLeft->setBoardSocket(dai::CameraBoardSocket::LEFT); + monoRight->setResolution(dai::MonoCameraProperties::SensorResolution::THE_400_P); + monoRight->setBoardSocket(dai::CameraBoardSocket::RIGHT); + monoLeft->setFps(static_cast(MONO_FPS)); + monoRight->setFps(static_cast(MONO_FPS)); + stereo->setDefaultProfilePreset(dai::node::StereoDepth::PresetMode::HIGH_DENSITY); + + // Create device + std::shared_ptr device; + try { + device = make_shared(pipeline, selectedDevice, dai::UsbSpeed::SUPER); + } + catch(const std::exception& e) { + FAIL_CHECK("Fail construct Device() with " << selectedDevice.toString() << " " << e.what()); + continue; + } + + // Call RPC + try { + cout << "Pipeline running on MXID: " << device->getMxId() << endl; + } + catch(const std::exception& e) { + FAIL_CHECK("Fail Device::getMxId() with " << selectedDevice.toString() << " " << e.what()); + continue; } - if(counter < NUM_MESSAGES) { - finished = false; + // Create queue callbacks for color and depth + if(COLOR_THRESHOLD > 0) { + auto colorQueue = device->getOutputQueue("color", 0, false); + colorQueue->addCallback( + [colorQueue, passedThreshold = false, COLOR_THRESHOLD](std::shared_ptr data) mutable { + auto frame = std::dynamic_pointer_cast(data); + auto sequenceNum = frame->getSequenceNum(); + if(sequenceNum < COLOR_THRESHOLD) { + if (sequenceNum % (COLOR_THRESHOLD / 10) == 0) + cout << '.'; + } else if(!passedThreshold) { + passedThreshold = true; + } + }); + } else { + } + if(DEPTH_THRESHOLD > 0) { + auto depthQueue = device->getOutputQueue("depth", 0, false); + depthQueue->addCallback( + [depthQueue, passedThreshold = false, DEPTH_THRESHOLD](std::shared_ptr data) mutable { + auto frame = std::dynamic_pointer_cast(data); + auto sequenceNum = frame->getSequenceNum(); + if(sequenceNum < DEPTH_THRESHOLD) { + if (sequenceNum % (DEPTH_THRESHOLD / 10) == 0) + cout << '.'; + } else if(!passedThreshold) { + passedThreshold = true; + } + }); + } else { } + SUCCEED("Successful Device with " << selectedDevice.toString()); } - } + }); + } - std::this_thread::sleep_for(1ms); - } while(!finished && steady_clock::now() - t1 < TEST_TIMEOUT); + // Join device threads + for(auto& thread : threads) { + thread.join(); + } +} + +TEST_CASE("Device APIs after Device::close()") { + constexpr auto TEST_TIMEOUT = 20s; + + // Wait to acquire a device. + auto t1 = steady_clock::now(); + vector availableDevices; + do { + availableDevices = dai::Device::getAllAvailableDevices(); + this_thread::sleep_for(500ms); + } while((availableDevices.empty() || availableDevices.size() < maxFoundDevices) && (steady_clock::now() - t1 <= DEVICE_SEARCH_TIMEOUT)); + REQUIRE(!availableDevices.empty()); + REQUIRE(availableDevices.size() >= maxFoundDevices); + maxFoundDevices = availableDevices.size(); + cout << "Found device count: " << availableDevices.size() << endl; + + // Create pipeline + dai::Pipeline pipeline; + + // Define color source and output + auto camColor = pipeline.create(); + auto xoutColor = pipeline.create(); + xoutColor->setStreamName("color"); + camColor->video.link(xoutColor->input); + + // Color properties + camColor->setBoardSocket(dai::CameraBoardSocket::RGB); + camColor->setResolution(dai::ColorCameraProperties::SensorResolution::THE_1080_P); + camColor->setFps(30.0f); + + // Create device + auto device = make_shared(pipeline, availableDevices[0], dai::UsbSpeed::SUPER); + cout << "MXID: " << device->getMxId() << "\nConnected cameras: "; + for(const auto& cam : device->getConnectedCameras()) { + cout << cam << " "; + } + cout << endl; + + // Get color queue and wait for one frame + bool ignore{}; + if(device->getOutputQueue("color")->get(TEST_TIMEOUT, ignore)) { + cout << "Device " << device->getMxId() << " message arrived" << endl; + } + + // Close device + device->close(); - return finished == 0; + // Validate Device API behaviors + CHECK_THROWS_AS(std::ignore = device->getMxId(), std::system_error); + CHECK_THROWS_AS(device->setXLinkChunkSize(1024), std::system_error); + CHECK_THROWS_AS(std::ignore = device->readCalibration(), std::system_error); + CHECK_NOTHROW(std::ignore = device->getDeviceInfo().name); + CHECK_NOTHROW(device->setLogOutputLevel(dai::LogLevel::WARN)); }