Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Device tests for close and threading crashes #809

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
3 changes: 1 addition & 2 deletions include/depthai/device/DeviceBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,8 +920,7 @@ class DeviceBase {
std::chrono::steady_clock::time_point lastWatchdogPingTime;

// closed
mutable std::mutex closedMtx;
bool closed{false};
std::atomic<bool> closed{false};

// pimpl
class Impl;
Expand Down
7 changes: 3 additions & 4 deletions include/depthai/xlink/XLinkConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ struct DeviceInfo {
std::string getMxId() const;
std::string toString() const;

std::string name = "";
std::string mxid = "";
std::string name;
std::string mxid;
XLinkDeviceState_t state = X_LINK_ANY_STATE;
XLinkProtocol_t protocol = X_LINK_ANY_PROTOCOL;
XLinkPlatform_t platform = X_LINK_ANY_PLATFORM;
Expand Down Expand Up @@ -148,8 +148,7 @@ class XLinkConnection {
DeviceInfo deviceInfo;

// closed
mutable std::mutex closedMtx;
bool closed{false};
std::atomic<bool> closed{false};

constexpr static std::chrono::milliseconds WAIT_FOR_BOOTUP_TIMEOUT{15000};
constexpr static std::chrono::milliseconds WAIT_FOR_CONNECT_TIMEOUT{5000};
Expand Down
10 changes: 5 additions & 5 deletions include/depthai/xlink/XLinkStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class XLinkStream {
streamId_t streamId{INVALID_STREAM_ID};

public:
XLinkStream(const std::shared_ptr<XLinkConnection> conn, const std::string& name, std::size_t maxWriteSize);
XLinkStream(const std::shared_ptr<XLinkConnection>& conn, const std::string& name, std::size_t maxWriteSize);
XLinkStream(const XLinkStream&) = delete;
XLinkStream(XLinkStream&& stream);
XLinkStream(XLinkStream&& stream) noexcept;
XLinkStream& operator=(const XLinkStream&) = delete;
XLinkStream& operator=(XLinkStream&& stream);
XLinkStream& operator=(XLinkStream&& stream) noexcept;
~XLinkStream();

// Blocking
Expand Down Expand Up @@ -89,11 +89,11 @@ struct XLinkError : public std::runtime_error {
: runtime_error(message), status(statusID), streamName(std::move(stream)) {}
};
struct XLinkReadError : public XLinkError {
using XLinkError = XLinkError;
using XLinkError::XLinkError;
XLinkReadError(XLinkError_t status, const std::string& stream);
};
struct XLinkWriteError : public XLinkError {
using XLinkError = XLinkError;
using XLinkError::XLinkError;
XLinkWriteError(XLinkError_t status, const std::string& stream);
};

Expand Down
86 changes: 37 additions & 49 deletions src/device/DeviceBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,6 @@ class DeviceBase::Impl {
DeviceLogger logger{"host", stdoutColorSink};

// RPC
std::mutex rpcMutex;
std::shared_ptr<XLinkStream> rpcStream;
std::unique_ptr<nanorpc::core::client<nanorpc::packer::nlohmann_msgpack>> rpcClient;

void setLogLevel(LogLevel level);
Expand Down Expand Up @@ -480,11 +478,10 @@ DeviceBase::DeviceBase(Config config, const DeviceInfo& devInfo) : deviceInfo(de
}

void DeviceBase::close() {
std::unique_lock<std::mutex> lock(closedMtx);
if(!closed) {
closeImpl();
closed = true;
}
// Only allow to close once
if(closed.exchange(true)) return;

closeImpl();
}

void DeviceBase::closeImpl() {
Expand Down Expand Up @@ -516,18 +513,13 @@ void DeviceBase::closeImpl() {
// At the end stop the monitor thread
if(monitorThread.joinable()) monitorThread.join();

// Close rpcStream
pimpl->rpcStream = nullptr;
pimpl->rpcClient = nullptr;

pimpl->logger.debug("Device closed, {}", duration_cast<milliseconds>(steady_clock::now() - t1).count());
}

// This function is thread-unsafe. The idea of "isClosed" is ephemerial and
// is invalidated during the return by value and continues to degrade in
// validity to the caller
bool DeviceBase::isClosed() const {
std::unique_lock<std::mutex> lock(closedMtx);
return closed || !watchdogRunning;
}

Expand All @@ -542,8 +534,10 @@ void DeviceBase::tryStartPipeline(const Pipeline& pipeline) {
}
} catch(const std::exception&) {
// close device (cleanup)
// can throw within itself, e.g. from the rpcclient lambda with an xlink error
close();
// Rethrow original exception

// Rethrow original exception when close() itself doesn't throw
throw;
}
}
Expand Down Expand Up @@ -673,7 +667,7 @@ void DeviceBase::init2(Config cfg, const dai::Path& pathToMvcmd, tl::optional<co
}

// Get embedded mvcmd or external with applied config
if(logger::get_level() == spdlog::level::debug) {
if(logger::get_level() <= spdlog::level::debug) {
nlohmann::json jBoardConfig = config.board;
pimpl->logger.debug("Device - BoardConfig: {} \nlibnop:{}", jBoardConfig.dump(), spdlog::to_hex(utility::serialize(config.board)));
}
Expand Down Expand Up @@ -726,32 +720,32 @@ void DeviceBase::init2(Config cfg, const dai::Path& pathToMvcmd, tl::optional<co
deviceInfo.state = expectedBootState;

// prepare rpc for both attached and host controlled mode
pimpl->rpcStream = std::make_shared<XLinkStream>(connection, device::XLINK_CHANNEL_MAIN_RPC, device::XLINK_USB_BUFFER_MAX_SIZE);
auto rpcStream = pimpl->rpcStream;

pimpl->rpcClient = std::make_unique<nanorpc::core::client<nanorpc::packer::nlohmann_msgpack>>([this, rpcStream](nanorpc::core::type::buffer request) {
// Lock for time of the RPC call, to not mix the responses between calling threads.
// Note: might cause issues on Windows on incorrect shutdown. To be investigated
std::unique_lock<std::mutex> lock(pimpl->rpcMutex);

// Log the request data
if(logger::get_level() == spdlog::level::trace) {
pimpl->logger.trace("RPC: {}", nlohmann::json::from_msgpack(request).dump());
}
pimpl->rpcClient = std::make_unique<nanorpc::core::client<nanorpc::packer::nlohmann_msgpack>>(
[rpcMutex = std::make_shared<std::mutex>(),
rpcStream = std::make_shared<XLinkStream>(connection, device::XLINK_CHANNEL_MAIN_RPC, device::XLINK_USB_BUFFER_MAX_SIZE),
&implLogger = this->pimpl->logger](nanorpc::core::type::buffer request) {
// Lock for time of the RPC call, to not mix the responses between calling threads.
// Note: might cause issues on Windows on incorrect shutdown. To be investigated
std::lock_guard<std::mutex> lock(*rpcMutex);

// Log the request data
if(logger::get_level() == spdlog::level::trace) {
implLogger.trace("RPC: {}", nlohmann::json::from_msgpack(request).dump());
}

try {
// Send request to device
rpcStream->write(std::move(request));

// Receive response back
// Send to nanorpc to parse
return rpcStream->read();
} catch(const std::exception& e) {
// If any exception is thrown, log it and rethrow
pimpl->logger.debug("RPC error: {}", e.what());
throw std::system_error(std::make_error_code(std::errc::io_error), "Device already closed or disconnected");
}
});
try {
// Send request to device
rpcStream->write(request);

// Receive response back
// Send to nanorpc to parse
return rpcStream->read();
} catch(const std::exception& e) {
// If any exception is thrown, log it and rethrow
implLogger.debug("RPC error: {}", e.what());
throw std::system_error(std::make_error_code(std::errc::io_error), "Device already closed or disconnected");
}
});

// prepare watchdog thread, which will keep device alive
// separate stream so it doesn't miss between potentially long RPC calls
Expand Down Expand Up @@ -912,19 +906,16 @@ void DeviceBase::init2(Config cfg, const dai::Path& pathToMvcmd, tl::optional<co
float rate = 1.0f;
while(profilingRunning) {
ProfilingData data = getProfilingData();
long long w = data.numBytesWritten - lastData.numBytesWritten;
long long r = data.numBytesRead - lastData.numBytesRead;
w /= rate;
r /= rate;

lastData = data;
const float w = (data.numBytesWritten - lastData.numBytesWritten) / rate;
const float r = (data.numBytesRead - lastData.numBytesRead) / rate;

pimpl->logger.debug("Profiling write speed: {:.2f} MiB/s, read speed: {:.2f} MiB/s, total written: {:.2f} MiB, read: {:.2f} MiB",
w / 1024.0f / 1024.0f,
r / 1024.0f / 1024.0f,
data.numBytesWritten / 1024.0f / 1024.0f,
data.numBytesRead / 1024.0f / 1024.0f);

lastData = std::move(data);
std::this_thread::sleep_for(duration<float>(1) / rate);
}
} catch(const std::exception& ex) {
Expand Down Expand Up @@ -1383,6 +1374,7 @@ bool DeviceBase::startPipelineImpl(const Pipeline& pipeline) {
const std::string streamAssetStorage = "__stream_asset_storage";
std::thread t1([this, &streamAssetStorage, &assetStorage]() {
XLinkStream stream(connection, streamAssetStorage, device::XLINK_USB_BUFFER_MAX_SIZE);
// TODO replace this code with XLinkStream::writeSplit()
int64_t offset = 0;
do {
int64_t toTransfer = std::min(static_cast<int64_t>(device::XLINK_USB_BUFFER_MAX_SIZE), static_cast<int64_t>(assetStorage.size() - offset));
Expand All @@ -1395,9 +1387,6 @@ bool DeviceBase::startPipelineImpl(const Pipeline& pipeline) {
t1.join();
}

// print assets on device side for test
pimpl->rpcClient->call("printAssets");

// Build and start the pipeline
bool success = false;
std::string errorMsg;
Expand All @@ -1406,7 +1395,6 @@ bool DeviceBase::startPipelineImpl(const Pipeline& pipeline) {
pimpl->rpcClient->call("startPipeline");
} else {
throw std::runtime_error(errorMsg);
return false;
}

return true;
Expand Down
2 changes: 1 addition & 1 deletion src/device/DeviceBootloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ std::vector<uint8_t> DeviceBootloader::createDepthaiApplicationPackage(
for(std::size_t i = 0; i < assetStorage.size(); i++) fwPackage[assetStorageSection->offset + i] = assetStorage[i];

// Debug
if(logger::get_level() == spdlog::level::debug) {
if(logger::get_level() <= spdlog::level::debug) {
SBR_SECTION* cur = &sbr.sections[0];
logger::debug("DepthAI Application Package");
for(; cur != lastSection + 1; cur++) {
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/datatype/StreamMessageParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ std::shared_ptr<ADatatype> StreamMessageParser::parseMessageToADatatype(streamPa
// RawBuffer is special case, no metadata is actually serialized
auto pBuf = std::make_shared<RawBuffer>();
pBuf->data = std::move(data);
return std::make_shared<Buffer>(pBuf);
return std::make_shared<Buffer>(std::move(pBuf));
} break;

case DatatypeEnum::ImgFrame:
Expand Down
4 changes: 2 additions & 2 deletions src/utility/Logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace dai {
class Logging {
// private constructor
Logging();
~Logging() {}
~Logging() = default;

public:
static Logging& getInstance() {
Expand All @@ -38,7 +38,7 @@ class Logging {

// Public API
spdlog::logger logger;
spdlog::level::level_enum parseLevel(std::string lvl);
static spdlog::level::level_enum parseLevel(std::string lvl);
};


Expand Down
2 changes: 1 addition & 1 deletion src/utility/PimplImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Pimpl<T>::Pimpl( Args&& ...args )
: m{ new T{ std::forward<Args>(args)... } } { }

template<typename T>
Pimpl<T>::~Pimpl() { }
Pimpl<T>::~Pimpl() = default;

template<typename T>
T* Pimpl<T>::operator->() { return m.get(); }
Expand Down
11 changes: 5 additions & 6 deletions src/utility/XLinkGlobalProfilingLogger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@ void XLinkGlobalProfilingLogger::enable(bool en) {
XLinkProf_t prof;
XLinkGetGlobalProfilingData(&prof);

long long w = prof.totalWriteBytes - lastProf.totalWriteBytes;
long long r = prof.totalReadBytes - lastProf.totalReadBytes;
w /= rate.load();
r /= rate.load();
const auto rateSnapshot = rate.load();
const float w = (prof.totalWriteBytes - lastProf.totalWriteBytes) / rateSnapshot;
const float r = (prof.totalReadBytes - lastProf.totalReadBytes) / rateSnapshot;

logger::debug("Profiling global write speed: {:.2f} MiB/s, read speed: {:.2f} MiB/s, total written: {:.2f} MiB, read: {:.2f} MiB",
w / 1024.0f / 1024.0f,
r / 1024.0f / 1024.0f,
prof.totalWriteBytes / 1024.0f / 1024.0f,
prof.totalReadBytes / 1024.0f / 1024.0f);

lastProf = prof;
this_thread::sleep_for(duration<float>(1) / rate.load());
lastProf = std::move(prof);
this_thread::sleep_for(duration<float>(1) / rateSnapshot);
}
});
}
Expand Down
Loading