diff --git a/ext/config.m4 b/ext/config.m4 index 46c6ba15..bf547351 100644 --- a/ext/config.m4 +++ b/ext/config.m4 @@ -386,6 +386,7 @@ EOF swow_signal.c \ swow_watchdog.c \ swow_closure.c \ + swow_siritz.c \ swow_ipaddress.c \ swow_http.c \ swow_websocket.c \ @@ -402,6 +403,11 @@ EOF SWOW_ADD_SOURCES(src, swow_weak_symbol.c, SWOW_INCLUDES, SWOW_CFLAGS) fi + dnl check if pthread have pthread_timedjoin_np + AC_CHECK_FUNCS([pthread_timedjoin_np], [ + AC_DEFINE([HAVE_PTHREAD_TIMEDJOIN_NP], 1, [Have pthread_timedjoin_np]) + ]) + dnl TODO: may use separate libcat if test "libcat" != ""; then diff --git a/ext/include/swow_siritz.h b/ext/include/swow_siritz.h index 11ec6b37..c8de3fbf 100644 --- a/ext/include/swow_siritz.h +++ b/ext/include/swow_siritz.h @@ -5,6 +5,15 @@ #include "swow.h" +/* globals */ + +CAT_GLOBALS_STRUCT_BEGIN(swow_siritz) { + HashTable threads; // sub-threads for this thread +} CAT_GLOBALS_STRUCT_END(swow_siritz); + +#define SWOW_SIRITZ_G(x) CAT_GLOBALS_GET(swow_siritz, x) + +/* Siritz object */ typedef struct swow_siritz_s { smart_str callable; smart_str args; @@ -12,6 +21,7 @@ typedef struct swow_siritz_s { zend_object std; } swow_siritz_t; +/* Siritz run struct */ typedef struct swow_siritz_run_s { smart_str callable; smart_str args; @@ -26,4 +36,7 @@ static zend_always_inline swow_siritz_t *swow_siritz_get_from_object(zend_object zend_result swow_siritz_module_init(INIT_FUNC_ARGS); zend_result swow_siritz_module_shutdown(INIT_FUNC_ARGS); +zend_result swow_siritz_runtime_init(INIT_FUNC_ARGS); +zend_result swow_siritz_runtime_shutdown(INIT_FUNC_ARGS); + #endif // _SWOW_SIRITZ_H \ No newline at end of file diff --git a/ext/src/swow_main.c b/ext/src/swow_main.c index 70faf8da..bb872ffd 100644 --- a/ext/src/swow_main.c +++ b/ext/src/swow_main.c @@ -328,6 +328,7 @@ PHP_RINIT_FUNCTION(swow) swow_dns_runtime_init, swow_stream_runtime_init, swow_watchdog_runtime_init, + swow_siritz_runtime_init, #ifdef CAT_OS_WAIT swow_proc_open_runtime_init, #endif @@ -369,6 +370,7 @@ PHP_RSHUTDOWN_FUNCTION(swow) #ifdef CAT_OS_WAIT swow_proc_open_runtime_shutdown, #endif + swow_siritz_runtime_shutdown, swow_watchdog_runtime_shutdown, swow_stream_runtime_shutdown, swow_event_runtime_shutdown, diff --git a/ext/src/swow_siritz.c b/ext/src/swow_siritz.c index 23b9be04..5f098c75 100644 --- a/ext/src/swow_siritz.c +++ b/ext/src/swow_siritz.c @@ -1,8 +1,61 @@ #include "swow.h" #include "SAPI.h" #include "php_main.h" +#include "php_variables.h" #include "swow_siritz.h" #include "swow_closure.h" +#include "swow_hook.h" + +#ifndef HAVE_PTHREAD_TIMEDJOIN_NP +// from https://stackoverflow.com/a/11552244 +struct pthread_timedjoin_np_args { + int joined; + pthread_t td; + pthread_mutex_t mtx; + pthread_cond_t cond; + void **res; +}; + +static void *swow_pthread_timedjoin_np_waiter(void *ap) +{ + struct pthread_timedjoin_np_args *args = ap; + pthread_join(args->td, args->res); + pthread_mutex_lock(&args->mtx); + args->joined = 1; + pthread_mutex_unlock(&args->mtx); + pthread_cond_signal(&args->cond); + return 0; +} + +static int swow_pthread_timedjoin_np(pthread_t td, void **res, struct timespec *ts) +{ + pthread_t tmp; + int ret; + struct pthread_timedjoin_np_args args = { .td = td, .res = res }; + + pthread_mutex_init(&args.mtx, 0); + pthread_cond_init(&args.cond, 0); + pthread_mutex_lock(&args.mtx); + + ret = pthread_create(&tmp, 0, swow_pthread_timedjoin_np_waiter, &args); + if (!ret) + do ret = pthread_cond_timedwait(&args.cond, &args.mtx, ts); + while (!args.joined && ret != ETIMEDOUT); + + pthread_mutex_unlock(&args.mtx); + + pthread_cancel(tmp); + pthread_join(tmp, 0); + + pthread_cond_destroy(&args.cond); + pthread_mutex_destroy(&args.mtx); + + return args.joined ? 0 : ret; +} +#define pthread_timedjoin_np swow_pthread_timedjoin_np +#endif + +CAT_GLOBALS_DECLARE(swow_siritz); #define getThisSiritz(s) swow_siritz_t *s = swow_siritz_get_from_object(Z_OBJ_P(ZEND_THIS)) @@ -10,8 +63,6 @@ SWOW_API zend_class_entry *swow_siritz_ce; SWOW_API zend_class_entry *swow_siritz_exception_ce; -static HashTable siritz; - ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Swow_Siritz___construct, 0, 1, 0) ZEND_ARG_TYPE_INFO(0, callable, IS_CALLABLE, 0) ZEND_ARG_VARIADIC_TYPE_INFO(0, data, IS_MIXED, 0) @@ -33,12 +84,15 @@ static PHP_METHOD(Swow_Siritz, __construct) return; } - smart_str str_callable = {0}; - // smart_str str_callable_dup = {0}; php_serialize_data_t var_hash; + + smart_str str_callable = {0}; PHP_VAR_SERIALIZE_INIT(var_hash); php_var_serialize(&str_callable, ZEND_CALL_ARG(execute_data, 1), &var_hash); PHP_VAR_SERIALIZE_DESTROY(var_hash); + if (EG(exception)) { + return; + } if (str_callable.s == NULL) { // serialize failed @@ -58,6 +112,9 @@ static PHP_METHOD(Swow_Siritz, __construct) PHP_VAR_SERIALIZE_INIT(var_hash); php_var_serialize(&str_args, &z_args, &var_hash); PHP_VAR_SERIALIZE_DESTROY(var_hash); + if (EG(exception)) { + return; + } zend_hash_destroy(&args); @@ -108,8 +165,8 @@ SWOW_API void swow_siritz_run(swow_siritz_run_t *call) const char *pe = call->callable.s->val + call->callable.s->len; int ret = php_var_unserialize( &z_code, - &p, - pe, + (const unsigned char **)&p, + (const unsigned char *)pe, &var_hash ); PHP_VAR_UNSERIALIZE_DESTROY(var_hash); @@ -127,8 +184,8 @@ SWOW_API void swow_siritz_run(swow_siritz_run_t *call) pe = call->args.s->val + call->args.s->len; ret = php_var_unserialize( &z_args, - &p, - pe, + (const unsigned char **)&p, + (const unsigned char *)pe, &var_hash ); PHP_VAR_UNSERIALIZE_DESTROY(var_hash); @@ -177,30 +234,30 @@ static PHP_METHOD(Swow_Siritz, run) // printf("%p %d %.*s\n", s->callable.s->val, s->callable.s->len, s->callable.s->len, s->callable.s->val); const swow_siritz_run_t *run = malloc(sizeof(*run)); - memcpy(run, (const swow_siritz_run_t[]){{ + memcpy((void *)run, (const swow_siritz_run_t[]){{ .callable = s->callable, .args = s->args, .server_context = SG(server_context), }}, sizeof(*run)); - + int ret = uv_thread_create_ex(&s->thread, (const uv_thread_options_t[]) {{ .flags = UV_THREAD_HAS_STACK_SIZE, .stack_size = 1024 * 1024, - }}, swow_siritz_run, run); + }}, (void *)swow_siritz_run, (void *)run); if (ret != 0) { - zend_throw_exception_ex(swow_siritz_exception_ce, 0, "Failed to create thread: %s", uv_strerror(ret)); + swow_throw_exception(swow_siritz_exception_ce, 0, "Failed to create thread: %s", uv_strerror(ret)); return; } // printf("add thread %p\n", s->thread); - zend_hash_str_add_ptr(&siritz, (const char *) &s->thread, sizeof(s->thread), "running"); + zend_hash_str_add_ptr(&SWOW_SIRITZ_G(threads), (const char *) &s->thread, sizeof(s->thread), "running"); RETURN_THIS(); } // returns enum for error code -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Swow_Siritz_wait, 0, 0, IS_LONG, 0) - ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timeout, IS_LONG, 0, "-1") +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Swow_Siritz_wait, 0, 0, IS_VOID, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timeout, IS_LONG, 1, "null") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, kill, _IS_BOOL, 0, "false") ZEND_END_ARG_INFO() @@ -208,28 +265,72 @@ static PHP_METHOD(Swow_Siritz, wait) { getThisSiritz(s); - zend_long timeout = -1; + zend_long timeout = 0; zend_bool kill = false; + zend_bool timeout_is_null = true; - ZEND_PARSE_PARAMETERS_START(1, -1) - Z_PARAM_LONG(timeout) + ZEND_PARSE_PARAMETERS_START(0, 2) + Z_PARAM_OPTIONAL + Z_PARAM_LONG_OR_NULL(timeout, timeout_is_null) Z_PARAM_BOOL(kill) ZEND_PARSE_PARAMETERS_END(); - // todo: use uv_thread and mutex things + bool done = false; + if (!s->thread) { + zend_throw_exception(swow_siritz_exception_ce, "Thread not started", 0); + RETURN_THROWS(); + } + + // maybetodo: use uv_thread and mutex things +#ifdef CAT_OS_WIN + if (timeout_is_null) { + timeout = INFINITE; + } DWORD ret = WaitForSingleObject((HANDLE)s->thread, timeout); + if (ret == WAIT_OBJECT_0) { + done = true; + } if (kill) { TerminateThread((HANDLE)s->thread, 0); + done = true; + } +#elif defined(CAT_OS_UNIX_LIKE) + + if (timeout_is_null) { + pthread_join(s->thread, NULL); + done = true; + } else { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += timeout / 1000; + ts.tv_nsec += (timeout % 1000) * 1000000; + if (ts.tv_nsec >= 1000000000) { + ts.tv_sec += 1; + ts.tv_nsec -= 1000000000; + } + + int ret = pthread_timedjoin_np(s->thread, NULL, &ts); + if (ret == 0) { + done = true; + } } - if (ret == WAIT_OBJECT_0 || kill) { - // printf("remove thread %p\n", s->thread); - zend_hash_str_del(&siritz, (const char *) &s->thread, sizeof(s->thread)); + if (kill) { + pthread_cancel(s->thread); + done = true; } +#else +# error "Unsupported OS" +#endif - RETURN_LONG(ret); + if (done) { + // printf("remove thread %p\n", s->thread); + zend_hash_str_del(&SWOW_SIRITZ_G(threads), (const char *) &s->thread, sizeof(s->thread)); + } else { + zend_throw_exception(swow_siritz_exception_ce, "Wait for thread timed out", 0); + } } ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Swow_Siritz_getTid, 0, 0, IS_LONG, 0) @@ -239,10 +340,21 @@ static PHP_METHOD(Swow_Siritz, getTid) { getThisSiritz(s); + if (!s->thread) { + zend_throw_exception(swow_siritz_exception_ce, "Thread not started", 0); + RETURN_THROWS(); + } + #ifdef CAT_OS_WIN RETURN_LONG(GetThreadId((HANDLE)s->thread)); +#elif defined(CAT_OS_DARWIN) + ino_t tid; + pthread_threadid_np(s->thread, &tid); + RETURN_LONG(tid); +#elif defined(CAT_OS_LINUX) + RETURN_LONG(s->thread); #else - RETURN_LONG(s->thread->tid); +# error "Unsupported OS" #endif } @@ -252,8 +364,14 @@ ZEND_END_ARG_INFO() PHP_FUNCTION(swow_getmytid) { #ifdef CAT_OS_WIN RETURN_LONG(GetCurrentThreadId()); -#else +#elif defined(CAT_OS_DARWIN) + ino_t tid; + pthread_threadid_np(pthread_self(), &tid); + RETURN_LONG(tid); +#elif defined(CAT_OS_LINUX) RETURN_LONG(gettid()); +#else +# error "Unsupported OS" #endif } @@ -284,7 +402,8 @@ static const zend_function_entry swow_siritz_functions[] = { zend_result swow_siritz_module_init(INIT_FUNC_ARGS) { - zend_class_entry ce; + // zend_class_entry ce; + CAT_GLOBALS_REGISTER(swow_siritz); swow_siritz_ce = swow_register_internal_class( "Swow\\Siritz", NULL, swow_siritz_methods, @@ -302,22 +421,50 @@ zend_result swow_siritz_module_init(INIT_FUNC_ARGS) return FAILURE; } - zend_hash_init(&siritz, 0, NULL, NULL, 1); + return SUCCESS; +} + +zend_result swow_siritz_runtime_init(INIT_FUNC_ARGS) +{ + zend_hash_init(&SWOW_SIRITZ_G(threads), 0, NULL, NULL, 1); return SUCCESS; } -zend_result swow_siritz_module_shutdown(INIT_FUNC_ARGS) +zend_result swow_siritz_runtime_shutdown(INIT_FUNC_ARGS) { - ZEND_HASH_REVERSE_FOREACH_STR_KEY(&siritz, HANDLE t) { - // printf("wait for thread %p\n", t); +#ifdef CAT_OS_WIN + ZEND_HASH_REVERSE_FOREACH_STR_KEY(&siritz, zend_string *strkey) { + pthread_t t = *(pthread_t *)strkey->val; + // printf("shutdown: wait for thread %p\n", t); DWORD ret = WaitForSingleObject(t, 1000/* TODO: configurable */); if (ret == WAIT_TIMEOUT) { TerminateThread(t, 0); } } ZEND_HASH_FOREACH_END(); +#elif defined(CAT_OS_UNIX_LIKE) + ZEND_HASH_REVERSE_FOREACH_STR_KEY(&SWOW_SIRITZ_G(threads), zend_string *strkey) { + // wait for at most 1 second + pthread_t t = *(pthread_t *)strkey->val; + // printf("shutdown: wait for thread %p\n", t); + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 1; + pthread_timedjoin_np(t, NULL, &ts); + // printf("shutdown: wait for thread %p done\n", t); + pthread_cancel(t); + } ZEND_HASH_FOREACH_END(); +#else +# error "Unsupported OS" +#endif return SUCCESS; } +zend_result swow_siritz_module_shutdown(INIT_FUNC_ARGS) +{ + CAT_GLOBALS_UNREGISTER(swow_siritz); + + return SUCCESS; +} diff --git a/ext/tests/swow_siritz/bad_args.phpt b/ext/tests/swow_siritz/bad_args.phpt new file mode 100644 index 00000000..fd664108 --- /dev/null +++ b/ext/tests/swow_siritz/bad_args.phpt @@ -0,0 +1,64 @@ +--TEST-- +swow_siritz: bad args +--SKIPIF-- + +--FILE-- +run(); +$siritz->wait(); + +echo "Done\n"; +?> +--EXPECTF-- +Create siritz normal +Create siritz arg unserializable +Create siritz callable unserializable +Create siritz callable with bad arg +%AFatal error: Uncaught ArgumentCountError: Too few arguments to function Closure::{closure}(), 0 passed and exactly 1 expected in %s +Stack trace: +#0 [internal function]: Closure->{closure}() +#1 {main} + thrown in %s +Done diff --git a/ext/tests/swow_siritz/create.phpt b/ext/tests/swow_siritz/create.phpt new file mode 100644 index 00000000..800c5782 --- /dev/null +++ b/ext/tests/swow_siritz/create.phpt @@ -0,0 +1,28 @@ +--TEST-- +swow_siritz: create +--SKIPIF-- + +--FILE-- +run(); + +?> +--EXPECT-- +Create siritz +Start siritz +I'm in siritz! diff --git a/ext/tests/swow_siritz/tid.phpt b/ext/tests/swow_siritz/tid.phpt new file mode 100644 index 00000000..95d47bdf --- /dev/null +++ b/ext/tests/swow_siritz/tid.phpt @@ -0,0 +1,35 @@ +--TEST-- +swow_siritz: tid +--SKIPIF-- + +--FILE-- +getTid(); +}, Swow\SiritzException::class, 0, 'Thread not started'); + +$siritz->run(); + +var_dump($siritz->getTid()); + +$siritz->wait(); + +echo "Done\n"; +?> +--EXPECTF-- +int(%d) +int(%d) +int(%d) +Done diff --git a/ext/tests/swow_siritz/wait.phpt b/ext/tests/swow_siritz/wait.phpt new file mode 100644 index 00000000..e0905f72 --- /dev/null +++ b/ext/tests/swow_siritz/wait.phpt @@ -0,0 +1,101 @@ +--TEST-- +swow_siritz: wait +--SKIPIF-- + +--FILE-- +run(); + +$siritz->wait(); + +echo "Done1\n"; + +echo "Create siritz2\n"; + +$siritz = new Siritz(function () { + msleep(100); +}); + +echo "Start siritz2\n"; + +$siritz->run(); + +Assert::throws(function () use ($siritz) { + $siritz->wait(10); +}, Swow\SiritzException::class, 0, 'Wait for thread timed out'); + +echo "Done2\n"; + +echo "Create siritz3\n"; + +$siritz = new Siritz(function () { + // do nothing +}); + +echo "Start siritz3\n"; + +$siritz->run(); + +msleep(50); + +$siritz->wait(); + +echo "Done3\n"; + +echo "Create siritz4\n"; + +$siritz = new Siritz(function () { + msleep(100); +}); + +echo "Start siritz4\n"; + +$siritz->run(); + +$siritz->wait(10, true); + +echo "Done4\n"; + +echo "Create siritz5\n"; + +$siritz = new Siritz(function () { + msleep(100); +}); + +Assert::throws(function () use ($siritz) { + $siritz->wait(10, true); +}, Swow\SiritzException::class, 0, 'Thread not started'); + +echo "Done5\n"; +?> +--EXPECT-- +Create siritz1 +Start siritz1 +I'm in siritz1! +Done1 +Create siritz2 +Start siritz2 +Done2 +Create siritz3 +Start siritz3 +Done3 +Create siritz4 +Start siritz4 +Done4 +Create siritz5 +Done5 diff --git a/ext/tests/swow_siritz/wait2.phpt b/ext/tests/swow_siritz/wait2.phpt new file mode 100644 index 00000000..056e1862 --- /dev/null +++ b/ext/tests/swow_siritz/wait2.phpt @@ -0,0 +1,122 @@ +--TEST-- +swow_siritz: wait2 +--SKIPIF-- + +--FILE-- +run(); +} + + +for ($i = 0; $i < 10; $i++) +{ + $siritz[$i]->wait(); +} + +echo "Done1\n"; + +echo "Create siritz2\n"; + +$siritz = []; +for ($i = 0; $i < 10; $i++) +{ + $siritz[$i] = new Siritz(function () { + require __DIR__ . '/../include/bootstrap.php'; + pseudo_random_sleep(); + echo "I'm in siritz2!\n"; + }); +} + +for ($i = 0; $i < 10; $i++) +{ + $siritz[$i]->run(); +} + +for ($i = 0; $i < 10; $i++) +{ + $siritz[$i]->wait(); +} + +echo "Done2\n"; + +echo "Create siritz3\n"; + +$siritz = []; +for ($i = 0; $i < 10; $i++) +{ + $siritz[$i] = new Siritz(function ($i) { + require __DIR__ . '/../include/bootstrap.php'; + pseudo_random_sleep(); + echo "I'm in siritz3 $i!\n"; + }, $i); + $siritz[$i]->run(); + $siritz[$i]->wait(); + echo "siritz3 end $i!\n"; +} + +echo "Done2\n"; + +?> +--EXPECT-- +Create siritz1 +I'm in siritz1! +I'm in siritz1! +I'm in siritz1! +I'm in siritz1! +I'm in siritz1! +I'm in siritz1! +I'm in siritz1! +I'm in siritz1! +I'm in siritz1! +I'm in siritz1! +Done1 +Create siritz2 +I'm in siritz2! +I'm in siritz2! +I'm in siritz2! +I'm in siritz2! +I'm in siritz2! +I'm in siritz2! +I'm in siritz2! +I'm in siritz2! +I'm in siritz2! +I'm in siritz2! +Done2 +Create siritz3 +I'm in siritz3 0! +siritz3 end 0! +I'm in siritz3 1! +siritz3 end 1! +I'm in siritz3 2! +siritz3 end 2! +I'm in siritz3 3! +siritz3 end 3! +I'm in siritz3 4! +siritz3 end 4! +I'm in siritz3 5! +siritz3 end 5! +I'm in siritz3 6! +siritz3 end 6! +I'm in siritz3 7! +siritz3 end 7! +I'm in siritz3 8! +siritz3 end 8! +I'm in siritz3 9! +siritz3 end 9! +Done2