Skip to content
This repository has been archived by the owner on Jun 26, 2019. It is now read-only.

Thread Pool

kmussel edited this page Dec 17, 2010 · 1 revision

How Callbacks Are Done With Threading:

Overview:

CoreJS uses a thread pool to manage all the worker threads. This thread pool is a singleton class that is used to dispatch the work.

    ThreadPool * pool  =  ThreadPool::instance();
    pool->dispatch(SomeClass);

This will then add the class to a thread for the work to be done and once the thread is done will notify the main thread.

The "SomeClass" that is passed into the dispatch function has a Base class of "ThreadBase". I'll get into how this works later.

The dispatch Function:

When the dispatch function is called, it first checks to see if the number of threads created is less than the max number that was set at initialization. If it is less, then a new WorkerThread class is created. This worker thread is then passed to the "initialized" function via the pthread_create() function. If it greater or equal to max number threads then the argument is pushed onto a queue and executed as soon as a thread becomes available.

The initialized Function:

This is where the Worker Thread lives. It calls the function that is responsible for doing the work which should return a string. This string is also the data that gets passed to the callback. Once its done executing it sets a "ready" flag to 1 and then notifies the main thread by writing a single character to the pipe. Afterwards, the thread goes into a wait state. Libevent then triggers the callback function ("onNotify") we added in the main file when we added the listener on the pipe.

The onNotify Function:

  • It first reads from the pipe. This makes sure we actually got a read and it clears out the pipe allowing for others to write to the pipe.

  • It then loops through the all the worker threads which are stored in a std::map in the ThreadPool class.

  • If the "ready" flag has been set it executes the javascript callback and decrements the event count.

  • After that it checks to see if there is any more work on the queue:

    • If Yes, sets the work to a thread and signals the thread out of its wait state to begin working again.
    • If No, checks to see if the event count is 0 and if so breaks out of the loop.
  • Outside the loop, it checks if the event count is 0 and if so deletes the event from the event_base. (Remember that as soon as there are no more events on the event_base then Libevent will break out of its loop "event_base_dispatch(base)".

How To Use Threading Callbacks:

  • The first thing that needs to happen is make sure your class is derived from the CallbackTemplate like this: *
   class Http : CallbackTemplate<Http> {…..};
  • Then in the function that the javascript calls you'll need to:

    • create a new instance of the class
    • set whatever arguments you need to
    • set the callback argument to the javascript function (this callback argument lives in the base class)
    • call the setCallback function passing to it the instance of this class and the function that the worker thread will execute (the setCallback function lives in the CallbackTemplate class)
    • then the ThreadPool instance will call its dispatch function with this class instance.
  • Here's an example of how the Http client is done:

  Http * c = new Http(); 
  c->host = url.host;	
  c->path = url.path;
  c->port = url.port;
  c->params = paramStr;
  c->callback = Persistent<Function>::New(Local<Function>::Cast(args[2]));
  c->setCallback(c, &Http::getPostData);
  tpool->dispatch(c);

And that's all you have to do.

Some Background Info:

The Problem: I needed a generic way to create classes and pass them to a new worker thread and have that thread execute a function to do the work.

Solutions:

Possible: I would need the ThreadPool dispatch function to take a base class in order to pass back different derived classes.
I would also need a function pointer so I could have a generic way to create new functions and have the worker thread execute them.

Just having those items I could create a semi generic way but every function that I would want to set the function pointer to would then need to have a Base class passed into it and could no longer use it as a normal member function.

My Solution:

What i wanted was to be able to create a function that you could use normally without having to pass any parameters to it and still be able to use the "This" pointer.
So instead of a function pointer I would need a pointer to a member function.
To do this generically I would need to use a template that inherits from a base class.
Inside this template I have a pointer to my object and a pointer to my member function.

 //   Looks like this where "T" is my template typename.
 T* obj;                           // pointer to object
 std::string (T::*fpt)();   // pointer to member function

So when I call the setCallback function these two variables are being set.

In my base class I create a virtual function to override my operator (). I do this so I can execute my function like this. (*derivedBaseClass)();

And in my template I define the operator ().

 virtual std::string operator()() 
 { 
     return (*obj.*fpt)(); 
 };
Clone this wiki locally