Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread safety of r_sink_mt() #22

Closed
ygeunkim opened this issue Dec 10, 2024 · 9 comments
Closed

Thread safety of r_sink_mt() #22

ygeunkim opened this issue Dec 10, 2024 · 9 comments

Comments

@ygeunkim
Copy link

ygeunkim commented Dec 10, 2024

I initially assumed that r_sink_mt would be thread-safe because it uses *_mt. However, when I tried using it with OpenMP loop, it appears to be non-thread-safe.
Upon reviewing its code, I noticed that it uses Rcpp::Rcout which is inherently not thread-safe.

#ifdef SPDLOG_USE_STD_FORMAT
Rcpp::Rcout << formatted;
#else
Rcpp::Rcout << fmt::to_string(formatted);
#endif

Others using stdout, such as stdout_logger_mt(), work as expected in the same OpenMp loop. Is there a way with this package to implement thread-safe logger without relying on stdout?
Here is my minimal example.

// [[Rcpp::depends(RcppSpdlog)]]

#include <RcppSpdlog>
#include <spdlog/spdlog.h>
#include <spdlog/sinks/stdout_sinks.h>
#include <omp.h>

// [[Rcpp::export]]
void omp_spdlog() {
    const int size = 8;
    auto console = spdlog::stdout_logger_mt("console");
    console->set_pattern("[%n] [thread %t] %v");
    console->info("Number of threads: {}", omp_get_max_threads());
    
    #pragma omp parallel for
    for (int i = 0; i < size; ++i) {
        int thread_id = omp_get_thread_num();
        console->info("Thread {} processing index {}", thread_id, i);
    }
}

/*** R
omp_spdlog()
*/

If I use spdlog::r_sink_mt("console") instead, I get error in the R console saying

[console] [thread 438466] Thread 0 processing index 0
Error: C stack usage  17556442392504 is too close to the limit
R 4.4.2 exited unexpectedly: exit code -1

Environment:

> sessionInfo()
R version 4.4.2 (2024-10-31)
Platform: x86_64-apple-darwin20
Running under: macOS Sequoia 15.1.1

Matrix products: default
BLAS:   /System/Library/Frameworks/Accelerate.framework/Versions/A/Frameworks/vecLib.framework/Versions/A/libBLAS.dylib 
LAPACK: /Library/Frameworks/R.framework/Versions/4.4-x86_64/Resources/lib/libRlapack.dylib;  LAPACK version 3.12.0

locale:
[1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8

time zone: Asia/Seoul
tzcode source: internal

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

loaded via a namespace (and not attached):
[1] compiler_4.4.2 cli_3.6.3      tools_4.4.2    jsonlite_1.8.8 rlang_1.1.4
@eddelbuettel
Copy link
Owner

eddelbuettel commented Dec 10, 2024

There may well a way, as there is upstream, but I have not had time to dig -- maybe you can. I assume you are aware of the limitations that R has when it comes to multithreaded code: you simply cannot call back from any portion. The vignette of package RcppParallel describes this well. But conceivably, with data in C++ datatype that do not share with R this may work. You may want to try file-based or other loggers as Rcpp::Rcout is obviously R code as it (on purpose, see Writing R Extensions uses the the R I/O) would violate the previous rule.

@ygeunkim
Copy link
Author

Thanks!
I found a solution using RcppThread.
Replacing Rcpp::Rcout in the r_sink<Mutex> with RcppThread::Rcout can achieve thread-safety. Defining additional derived class from it becomes thread-safe as stdout methods.

namespace mynamespace {

namespace sinks {

template<typename Mutex>
class rcppthread_sink : public spdlog::sinks::r_sink<Mutex> {
protected:
  void sink_it_(const spdlog::details::log_msg& msg) override {
    spdlog::memory_buf_t formatted;
    spdlog::sinks::base_sink<Mutex>::formatter_->format(msg, formatted);
    #ifdef SPDLOG_USE_STD_FORMAT
    RcppThread::Rcout << formatted;
    #else
    RcppThread::Rcout << fmt::to_string(formatted);
    #endif
  }

  void flush_() override {
    RcppThread::Rcout << std::flush;
  }
};

using rcppthread_sink_mt = rcppthread_sink<std::mutex>;

} // namespace sinks

template<typename Factory = spdlog::synchronous_factory>
inline std::shared_ptr<spdlog::logger> rcppthread_sink_mt(const std::string &logger_name) {
    return Factory::template create<sinks::rcppthread_sink_mt>(logger_name);
}

} // namespace mynamespace

This mynamespace::rcppthread_sink_mt() can be used in the above minimal example.

@eddelbuettel
Copy link
Owner

Oh, that is nice. I should document that (and give you credit).

@eddelbuettel
Copy link
Owner

Very nice:

r> Rcpp::sourceCpp("/tmp/r/spdlogmt.cpp")
 -latomic -lpthread
> omp_spdlog()
[console] [thread 1388170] Number of threads: 12
[console] [thread 1390553] Thread 3 processing index 3
[console] [thread 1390551] Thread 1 processing index 1
[console] [thread 1390552] Thread 2 processing index 2
[console] [thread 1390554] Thread 4 processing index 4
[console] [thread 1390555] Thread 5 processing index 5
[console] [thread 1388170] Thread 0 processing index 0
> 

Any idea why I get -latomic -lpthread from RcppThread ? Do you have that too?

Complete version of your two files amalgamated and slightly modified to be callable multiple times below.

// [[Rcpp::depends(RcppSpdlog)]]
// [[Rcpp::depends(RcppThread)]]
// [[Rcpp::plugins(openmp)]]

#include <RcppThread.h>
#include <RcppSpdlog>
#include <spdlog/spdlog.h>
#include <spdlog/sinks/stdout_sinks.h>
#include <omp.h>

namespace mynamespace {

  namespace sinks {

    template<typename Mutex>
    class rcppthread_sink : public spdlog::sinks::r_sink<Mutex> {
    protected:
      void sink_it_(const spdlog::details::log_msg& msg) override {
        spdlog::memory_buf_t formatted;
        spdlog::sinks::base_sink<Mutex>::formatter_->format(msg, formatted);
        #ifdef SPDLOG_USE_STD_FORMAT
        RcppThread::Rcout << formatted;
        #else
        RcppThread::Rcout << fmt::to_string(formatted);
        #endif
      }

      void flush_() override {
        RcppThread::Rcout << std::flush;
      }
    };

    using rcppthread_sink_mt = rcppthread_sink<std::mutex>;

  } // namespace sinks

  template<typename Factory = spdlog::synchronous_factory>
  inline std::shared_ptr<spdlog::logger> rcppthread_sink_mt(const std::string &logger_name) {
    return Factory::template create<sinks::rcppthread_sink_mt>(logger_name);
  }

} // namespace mynamespace


// [[Rcpp::export]]
void omp_spdlog() {
    const int size = 8;
    std::string logname = "console";
    auto console = spdlog::get(logname);
    if (console == nullptr) console = mynamespace::rcppthread_sink_mt(logname);
    spdlog::set_default_logger(console);

    console->set_pattern("[%n] [thread %t] %v");
    console->info("Number of threads: {}", omp_get_max_threads());

    #pragma omp parallel for
    for (int i = 0; i < size; ++i) {
        int thread_id = omp_get_thread_num();
        console->info("Thread {} processing index {}", thread_id, i);
    }
}

/*** R
omp_spdlog()
*/

@eddelbuettel
Copy link
Owner

I committed this in d3d8ef7, please let me know if you think it needs another update. Thanks for the suggestion!

@ygeunkim
Copy link
Author

Thanks! About -latomic -lpthread, I get the same message. It's related to the RcppThead package's flag; it uses its own LdFlags() in its src/Makevars, which adds those two flags.

@eddelbuettel
Copy link
Owner

eddelbuettel commented Dec 11, 2024

Yes -- I was in a rush earlier and didn't look. I think it copies a (bad !!) setup I once had added in Rcpp where the echo also is to screen. I think I emailed Thomas Nagler about it once a while ago, he may have forgotten to fix it.... 🤷‍♂️

It's only an issue for Rcpp::sourceCpp() and the use of the plugin so not the greatest problem in the world...

@ygeunkim
Copy link
Author

Another solution is possible - RcppThread package provides code that overrides std::cout with its thread-safe RcppThread::Rcout. We can apply the same approach to make Rcpp::Rcout in spdlog thread-safe.
The original implementation:

https://github.com/tnagler/RcppThread/blob/2de1797a1bcc08d02ea430de1fb75e9645126f2e/inst/include/RcppThread/Rcout.hpp#L56-L65

Here's how to use it:

#include <Rcpp.h>
#include <RcppThread.h> // using only #include <RcppThread/Rcout.hpp> is okay

#define Rcout RcppThreadRcout
namespace Rcpp {
	
static RcppThread::RPrinter RcppThreadRcout = RcppThread::RPrinter();
	
} // namespace Rcpp

#include <RcppSpdlog>

This is simpler than defining a derived class (but I haven't checked if this affects other Rcpp code behavior). Now spdlog::r_sink_mt() can be used thread-safe. However, it requires careful header ordering:

  1. Include Rcpp and RcppThread headers
  2. Override Rcpp::Rcout
  3. Include RcppSpdlog header to compile r_sink_mt with overridden Rcout.

@eddelbuettel
Copy link
Owner

Sure. But if I may, and with all the appreciattion for the earlier help: If you want to make real improvements, learn to do a proper PR, and thoroughly test it before submitting. I cannot spend all my times combining partial file snippets you put up here just because that is your current focus.

Either way we have a remaining dependency on RcppThreads. The best would probably be to (one day) talk to Thomas and see if he wants to 'donate' the monitor class used and the Rcout/Rcerr reimplementations to Rcpp. Then we'd be zero-depends there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants