This is the new real time kernel implementing Communicating Sequential Processes. With this software a system is a collection of processes which communicate using channels. Systems can communicate with each other and the environment by connecting to devices.
Here is our first coroutine:
struct hello_world : coroutine_t {
size_t size()const { return sizeof(*this); }
con_t *resume() {
::std::cout << "Hello World" << ::std::endl;
return return_control();
}
};
The code above is written out fully. It can be simplificed a bit using some macros:
struct hello_world : coroutine_t {
CSP_SIZE
CSP_RESUME_START
::std::cout << "Hello World" << ::std::endl;
CSP_COSUICIDE
CSP_RESUME_END
};
int main() {
alloc_ref_t system_allocator = new malloc_free_allocator_t;
auto system = new system_t(system_allocator);
auto process_allocator = system_allocator;
auto hello = new(process_allocator) hello_world();
csp_run(system, process_allocator, hello);
delete system;
}
You can run csp anywhere by simply calling the csp_run function. It requires three arguments:
- A pointer to a system object
- A reference to a process allocator
- A pointer to a suspended, ready to run continuation object
The system constructor requires an allocator which is used to allocate store for system objects. We have to use a reference counting smart pointer of type alloc_ref_t to construct the system. We construct it from a pointer to a malloc_free_allocator_t object.
The continuationm object hello is an instance of the hello_world class. It is a suspended coroutine continuation, ready to run, but not activated yet. The csp_run function will activate it.
This object must be constructed by the process allocator passed to the csp_run function. To do this, we pass that allocator to the new operator as an argument.
After execution is complete, we should delete the system object. The allocator object and continuation object will be deleted automatically.
Our system is ideally suited to stream processing. We will construct a simple pipeline as a demonstration. The components of the pipeline will be connected by channels. The long winded boilerplate is shown first, and then simplifying macros. Ultimately this code will be generated by a higher level language translator.
The simplest coroutine is the consumer which reads data from its input channel and prints it.
template<class T>
struct printer : coroutine_t {
CSP_SIZE
// input channel endpoint reference
chan_epref_t inp;
// read control block
io_request_t rreq;
// place to put pointer to data
T *data;
con_t *resume() override {
switch(pc++) {
case 0:
// setup the read control block
rreq.svc_code = read_request_code_e;
rreq.chan = &inp;
rreq.pdata = (void**)&data;
//::std::cout << fibre << " Consumer running " << ::std::endl;
return this;
case 1:
//::std::cout << fibre << " Consumer performing read on " << inp.get() << ::std::endl;
// perform the read
SVC(&rreq)
case 2:
//::std::cout << fibre << " Consumer performed read on " << inp.get() << " got object at " << data << " value " << *data << ::std::endl;
// print the value and dispose of the data
::std::cout << *data << ::std::endl;
delete_concrete_object(data, fibre->process->process_allocator);
// loop back for the next value
pc = 1;
return this;
}
return this; // dummy return to shut compiler up
}
};
The first thing to note is that the system moves a pointer to data from the writer to the reader. Therefore our consumer must delete the data after using it. This is done using the delete_concrete_object function. This is a template that assumes the size of the data is determined by the type of the pointer.
It may seem allocating and deallocating data all the time is inefficient. This is true with the simple malloc_free_allocator_t we will be using for demonstration. However we provide very high performance allocators which greatly reduce the cost and are suitable for use in real time multithreaded signal processing applications.
The second thing to note is that the code is flat assembler level code. There are no local variables, we only use non-static data members. We don't even use a for loop!
This is because the machine stack must be empty when a service call is performed. In particular, the SVC macro returns control to the system scheduler, and any local variables would be lost. Indeed the location in the code is also lost!
To fix this, we use a special variable pc as the program counter. It is simply the integer value of the next case to be executed when the routine is **resume()**d
The second thing we're going to do is to make a producer coroutine that spits out the integers 1 to 10.
struct producer : coroutine_t {
CSP_SIZE
// output channel endpoint reference
chan_epref_t out;
// service control block for write operation
io_request_t wreq;
// counter
int counter;
// data to be written
int *data;
con_t *resume() override {
switch(pc++) {
case 0:
counter = 1;
// set up the write control block
wreq.svc_code = write_request_code_e;
wreq.chan = &out;
wreq.pdata = (void**)&data;
return this;
case 1:
// data to be written
//::std::cout << fibre << " Writing " << counter << ::std::endl;
data = new(fibre->process->process_allocator) int(counter++);
// write the data
SVC(&wreq)
case 2:
// check for termination
if (counter > 4) return return_control();
// loop around to step 1
pc = 1;
return this;
}
return this; // dummy return to shut compiler up
}
};
Now we need another coroutine to connect the actual pipeline.
struct init : coroutine_t {
CSP_SIZE
chan_epref_t inp;
chan_epref_t out;
spawn_fibre_request_t spawn_req;
printer<int> *consumer;
producer *prod;
con_t *resume() override {
switch(pc++) {
case 0:
::std::cout << fibre << " Init running" << ::std::endl;
// construct a new channel and return enpoint
inp = make_sequential_channel(fibre->process->system->system_allocator);
// construct another endpoint of the same channel
out = inp->dup();
// setup the spawn request control block
spawn_req.svc_code = spawn_fibre_deferred_request_code_e;
// construct the consumer
consumer = new(fibre->process->process_allocator) printer<int>;
// assign the channel endpoint
consumer->inp = inp;
// spawn the consumer deferring execution
spawn_req.tospawn = consumer;
SVC(&spawn_req);
case 1:
::std::cout << fibre << " spawned printer, now spawning producer" << ::std::endl;
// construct the producer
prod = new(fibre->process->process_allocator) producer;
// assign the channel endpoint
prod->out = out;
// spawn the producer using deferring execution
spawn_req.tospawn = prod;
SVC(&spawn_req);
case 3:
::std::cout << fibre << " init suicide" << ::std::endl;
// suicide
return return_control();
}
return this; // dummy return to shut compiler up
}
};
We have created a sequential channel with one endpoint and acquired a reference to it. We then call the dup method on the reference to obtain a new reference to another endpoint to the same channel.
Channel I/O in our system depends on routines of a fibre having access to exactly one endpoint. If two enpoints are passed, the automatic detection of an I/O operation that would block forever due to the lack of a second endpoint being help by an active fibre will not work, and the system may hang rather than terminate correctly.
Indeed the fibre spawned by the init routine breaks the rule, since it continues to hold access to an endpoint of a channel after passing a copy to another fibre. However the routine does not do any I/O and terminates correctly, at which time the requirement is finally met, and blocked fibres can now correctly lead to the termination of routines. Using spawn_deferred allows the init routine to terminate first so that the producer and consumer terminate at the earliest possible time.
[At this time there is no reset method to release an endpoint from its reference]
Notice carefully that the autoincrement of the pc variable in the switch statement argument suffices to manage control flow in sequential code, it being understood that the SVC macro implements a service call by returning control to the scheduler. Therefore it is essential to add a case label after every return instruction including SVC corresponding to the value pc will have when the resume method is next called.
Finally we can run the pipeline.
int main() {
alloc_ref_t system_allocator = new debugging_allocator_t(nullptr, new malloc_free_allocator_t, "System");
auto system = new system_t(system_allocator);
auto process_allocator = system_allocator;
auto start = new(process_allocator) init;
csp_run(system, process_allocator, start);
::std::cout << "System returned" << ::std::endl;
delete system;
}
Here we simply construct an system allocator and copy the reference to it for the process allocator. We use the system allocator that to construct a system object. Then we create a suspension from coroutine init and pass it to the csp_run function along with the system object and process allocator.
The init object is then run as a fibre, creating our pipeline, and when all is done the csp_run function returns and we delete the system object.