Skip to content

Commit

Permalink
various bug fix, remove args_get, fix go_for sig
Browse files Browse the repository at this point in the history
- added `args_deferred` to force memory cleanup/release
- update examples `args` can be directly index accessed

** latest dependency update now reveal leaks this library has and example `co_future_wait` not running correctly **
  • Loading branch information
TheTechsTech committed Dec 23, 2024
1 parent 062e417 commit 44e5ba6
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 125 deletions.
36 changes: 9 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,6 @@ There are five simple ways to create coroutines:
which call this function as an coroutine! */
int co_main(int, char **);

/* Calls fn (with args as arguments) in separated thread, returning without waiting
for the execution of fn to complete. The value returned by fn can be accessed through
the future object returned (by calling `co_async_get()`). */
C_API future *co_async(callable_t, void_t);

/* Returns the value of a promise, a future thread's shared object, If not ready this
function blocks the calling thread and waits until it is ready. */
C_API value_t co_async_get(future *);

/* Waits for the future thread's state to change. this function pauses current coroutine
and execute others until future is ready, thread execution has ended. */
C_API void co_async_wait(future *);

/* Creates/initialize the next series/collection of coroutine's created to be part of wait group,
same behavior of Go's waitGroups, but without passing struct or indicating when done.

Expand Down Expand Up @@ -696,30 +683,25 @@ int main ()
#include "coroutine.h"

// a non-optimized way of checking for prime numbers:
void_t is_prime(void_t arg)
{
int x = c_int(arg);
for (int i = 2; i < x; ++i)
if (x % i == 0) return (void_t)false;
return (void_t)true;
void *is_prime(args_t arg) {
int i, x = get_arg(arg).integer;
for (i = 2; i < x; ++i) if (x % i == 0) return thrd_value(false);
return thrd_value(true);
}

int co_main(int argc, char **argv)
{
int co_main(int argc, char **argv) {
int prime = 194232491;
// call function asynchronously:
future *f = co_async(is_prime, &prime);
future fut = thrd_async(is_prime, thrd_value(prime));

printf("checking...\n");
// Pause and run other coroutines
// until thread state changes.
co_async_wait(f);
thrd_wait(fut, co_yield_info);

printf("\n194232491 ");
// guaranteed to be ready (and not block)
// after wait returns
if (co_async_get(f).boolean)
printf("is prime!\n");
if (thrd_get(fut).boolean) // guaranteed to be ready (and not block) after wait returns
printf("is prime.\n");
else
printf("is not prime.\n");

Expand Down
30 changes: 7 additions & 23 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,6 @@ There are five simple ways to create coroutines:
which call this function as an coroutine! */
int co_main(int, char **);

/* Calls fn (with args as arguments) in separated thread, returning without waiting
for the execution of fn to complete. The value returned by fn can be accessed through
the future object returned (by calling `co_async_get()`). */
C_API future *co_async(callable_t, void_t);

/* Returns the value of a promise, a future thread's shared object, If not ready this
function blocks the calling thread and waits until it is ready. */
C_API value_t co_async_get(future *);

/* Waits for the future thread's state to change. this function pauses current coroutine
and execute others until future is ready, thread execution has ended. */
C_API void co_async_wait(future *);

/* Creates/initialize the next series/collection of coroutine's created to be part of wait group,
same behavior of Go's waitGroups, but without passing struct or indicating when done.

Expand Down Expand Up @@ -733,27 +720,24 @@ int main ()
#include "coroutine.h"
// a non-optimized way of checking for prime numbers:
void_t is_prime(void_t arg) {
int x = c_int(arg);
for (int i = 2; i < x; ++i)
if (x % i == 0) return (void_t)false;
return (void_t)true;
void *is_prime(args_t arg) {
int i, x = get_arg(arg).integer;
for (i = 2; i < x; ++i) if (x % i == 0) return thrd_value(false);
return thrd_value(true);
}
int co_main(int argc, char **argv) {
int prime = 194232491;
// call function asynchronously:
future *fut = co_async(is_prime, &prime);
future fut = thrd_async(is_prime, thrd_value(prime));
printf("checking...\n");
// Pause and run other coroutines
// until thread state changes.
co_async_wait(fut);
thrd_wait(fut, co_yield_info);
printf("\n194232491 ");
// guaranteed to be ready (and not block)
// after wait returns
if (co_async_get(fut).boolean)
if (thrd_get(fut).boolean) // guaranteed to be ready (and not block) after wait returns
printf("is prime.\n");
else
printf("is not prime.\n");
Expand Down
3 changes: 2 additions & 1 deletion examples/co_future_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

// a non-optimized way of checking for prime numbers:
void *is_prime(args_t arg) {
RAII_HERE;
int i, x = get_arg(arg).integer;
for (i = 2; i < x; ++i) if (x % i == 0) return thrd_value(false);
return thrd_value(true);
Expand All @@ -10,7 +11,7 @@ void *is_prime(args_t arg) {
int co_main(int argc, char **argv) {
int prime = 194232491;
// call function asynchronously:
future *fut = thrd_async(is_prime, thrd_data(prime));
future fut = thrd_async(is_prime, thrd_value(prime));

printf("checking...\n");
thrd_wait(fut, co_yield_info);
Expand Down
11 changes: 6 additions & 5 deletions examples/go_multi_args.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
#include "coroutine.h"

void *worker(args_t arg) {
int i, count = args_get(arg, 0).integer;
char *text = args_get(arg, 1).char_ptr;
args_deferred(arg);
int i, count = arg[0].integer;
char *text = arg[1].char_ptr;

for (i = 0; i < count; i++) {
printf("%s\n", text);
Expand All @@ -13,9 +14,9 @@ void *worker(args_t arg) {
}

int co_main(int argc, char **argv) {
go_for(worker, "is", 4, "a");
go_for(worker, "is", 2, "b");
go_for(worker, "is", 3, "c");
go_for(worker, 2, 4, "a");
go_for(worker, 2, 2, "b");
go_for(worker, 2, 3, "c");

sleep_for(100);

Expand Down
7 changes: 4 additions & 3 deletions examples/go_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ int fibonacci(channel_t *c, channel_t *quit) {
} select_end;
}

void *func(void *args) {
channel_t *c = args_get(args, 0).object;
channel_t *quit = args_get(args, 1).object;
void *func(args_t args) {
args_deferred(args);
channel_t *c = args[0].object;
channel_t *quit = args[1].object;
int i;

defer(delete, c);
Expand Down
7 changes: 4 additions & 3 deletions examples/go_wait_group.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include "coroutine.h"

void *worker(void *arg) {
int wid = args_get(arg, 0).integer + 1;
void *worker(args_t arg) {
args_deferred(arg);
int wid = arg[0].integer + 1;
int id = co_id();

printf("Worker %d starting\n", wid);
Expand All @@ -25,7 +26,7 @@ int co_main(int argc, char **argv) {

wait_group_t wg = wait_group_by(50);
for (i = 0; i < 50; i++) {
cid[i] = go(worker, args_for("i", i));
cid[i] = go(worker, args_for(1, i));
}
wait_result_t wgr = wait_for(wg);

Expand Down
46 changes: 6 additions & 40 deletions include/coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,40 +578,10 @@ C_API atomic_deque_t gq_sys;
C_API routine_t *EMPTY_T;

/* Generic simple union storage types. */
typedef union {
int integer;
unsigned int u_int;
signed long s_long;
unsigned long u_long;
long long long_long;
size_t max_size;
float point;
double precision;
bool boolean;
signed short s_short;
unsigned short u_short;
signed char schar;
unsigned char uchar;
unsigned char *uchar_ptr;
char *char_ptr;
char **array;
void_t object;
callable_t func;
const char const_char[512];
} value_t;

typedef struct values_s {
value_t value;
value_types type;
} values_t;

typedef values_type value_t;
typedef raii_values_t values_t;
typedef void_t(*callable_args_t)(args_t);

typedef struct {
value_types type;
value_t value;
} generics_t;

typedef enum {
ZE_OK = CO_NONE,
ZE_ERR = CO_NULL,
Expand Down Expand Up @@ -695,15 +665,11 @@ C_API uv_loop_t *co_loop(void);
#endif

/**
* Returns generic union `values_type` of argument, will auto `release/free`
* allocated memory when current coroutine return/exit.
*
* Must be called at least once to release `allocated` memory.
* Must be called at least once to release `allocated` memory, when current coroutine return/exit.
*
* @param params arbitrary arguments
* @param item index number
*/
C_API values_type args_get(void_t params, int item);
C_API void args_deferred(args_t args);

/* Return handle to current coroutine. */
C_API routine_t *co_active(void);
Expand Down Expand Up @@ -829,7 +795,7 @@ C_API value_t chan_recv(channel_t *);
/* Creates an coroutine of given function with argument,
and add to schedular, same behavior as Go in golang. */
C_API u32 go(callable_t, void_t);
C_API u32 go_for(callable_args_t fn, const char *desc, ...);
C_API u32 go_for(callable_args_t fn, size_t num_of_args, ...);

/* Creates an coroutine of given function with argument, and immediately execute. */
C_API void launch(func_t, void_t);
Expand Down Expand Up @@ -1021,7 +987,7 @@ C_API wait_result_t wait_for_ex(wait_group_t);
/* Returns results of the given completed coroutine id, value in union value_t storage format. */
C_API value_t wait_result(wait_result_t, u32);

C_API awaitable_t async_for(callable_args_t fn, const char *desc, ...);
C_API awaitable_t async_for(callable_args_t fn, size_t num_of_args, ...);
C_API awaitable_t async(callable_t fn, void_t arg);
C_API value_t await(awaitable_t task);

Expand Down
2 changes: 1 addition & 1 deletion include/uv_routine.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#define var_unsigned_int(arg) (arg).value.u_int
#define var_unsigned_long(arg) (arg).value.u_long
#define var_size_t(arg) (arg).value.max_size
#define var_const_char(arg) (arg).value.const_char
#define var_const_char(arg) (string_t)(arg).value.buffer
#define var_char(arg) (arg).value.schar
#define var_char_ptr(arg) (arg).value.char_ptr
#define var_bool(arg) (arg).value.boolean
Expand Down
16 changes: 3 additions & 13 deletions src/coroutine.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ CO_FORCE_INLINE co_collector_t *co_collector_list(void) {
return coroutine_list();
}

values_type args_get(void_t params, int item) {
args_t args = (args_t)params;
void args_deferred(args_t args) {
args_deferred_set(args, co_scope());

return args[item];
}

void delete(void_t ptr) {
Expand Down Expand Up @@ -94,18 +91,11 @@ values_t *co_var(var_t *data) {
}

value_t co_value(void_t data) {
if (data)
return ((values_t *)data)->value;

RAII_LOG("attempt to get value on null");
return ((values_t *)0)->value;
return co_data((values_t *)data);
}

value_t co_data(values_t *data) {
if (data)
return data->value;

return ((values_t *)0)->value;
return raii_value(data);
}

CO_FORCE_INLINE void co_suspend(void) {
Expand Down
18 changes: 9 additions & 9 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -1749,13 +1749,13 @@ CO_FORCE_INLINE void stack_set(u32 size) {
gq_sys.stacksize = size;
}

u32 go_for(callable_args_t fn, const char *desc, ...) {
u32 go_for(callable_args_t fn, size_t num_of_args, ...) {
va_list ap;
size_t i, count = simd_strlen(desc);
size_t i;

args_t params = args_for(0);
va_start(ap, desc);
for (i = 0; i < count; i++)
va_start(ap, num_of_args);
for (i = 0; i < num_of_args; i++)
vector_push_back(params, va_arg(ap, void_t));
va_end(ap);

Expand Down Expand Up @@ -1784,13 +1784,13 @@ static awaitable_t async_ex(callable_t fn, void_t arg) {
return awaitable;
}

awaitable_t async_for(callable_args_t fn, const char *desc, ...) {
awaitable_t async_for(callable_args_t fn, size_t num_of_args, ...) {
va_list ap;
size_t i, count = simd_strlen(desc);
size_t i;

args_t params = args_for(0);
va_start(ap, desc);
for (i = 0; i < count; i++)
va_start(ap, num_of_args);
for (i = 0; i < num_of_args; i++)
vector_push_back(params, va_arg(ap, void_t));
va_end(ap);

Expand Down Expand Up @@ -2439,7 +2439,7 @@ int main(int argc, char **argv) {
}

#ifdef UV_H
uv_replace_allocator(rp_malloc, rp_realloc, rp_calloc, rpfree);
// uv_replace_allocator(rp_malloc, rp_realloc, rp_calloc, rpfree);
#endif

create_routine(main_main, NULL, gq_sys.stacksize * 4, RUN_MAIN);
Expand Down

0 comments on commit 44e5ba6

Please sign in to comment.