Skip to content

Commit

Permalink
Merge branch 'devel'
Browse files Browse the repository at this point in the history
  • Loading branch information
infradig committed Nov 27, 2023
2 parents 2e8b091 + a334368 commit 1a41db4
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 298 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ SRCOBJECTS = tpl.o \
src/bif_predicates.o \
src/bif_sregex.o \
src/bif_streams.o \
src/bif_tasks.o \
src/heap.o \
src/history.o \
src/library.o \
Expand Down Expand Up @@ -264,6 +265,10 @@ src/bif_streams.o: src/bif_streams.c src/heap.h src/internal.h \
src/skiplist/skiplist.h src/trealla.h src/cdebug.h src/stringbuf.h \
src/imath/imath.h src/imath/imrat.h src/module.h src/network.h src/parser.h src/prolog.h \
src/query.h src/builtins.h src/utf8/utf8.h
src/bif_tasks.o: src/bif_tasks.c src/bif_atts.h src/base64.h src/heap.h src/internal.h \
src/skiplist/skiplist.h src/trealla.h src/cdebug.h src/stringbuf.h \
src/imath/imath.h src/imath/imrat.h src/history.h src/library.h src/module.h src/sre/re.h \
src/parser.h src/prolog.h src/query.h src/builtins.h src/utf8/utf8.h
src/toplevel.o: src/toplevel.c src/heap.h src/internal.h \
src/skiplist/skiplist.h src/trealla.h src/cdebug.h src/stringbuf.h \
src/imath/imath.h src/imath/imrat.h src/history.h src/module.h src/parser.h src/prolog.h \
Expand Down
308 changes: 10 additions & 298 deletions src/bif_predicates.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,6 @@ static void msleep(int ms)
}
#endif

bool do_yield(query *q, int msecs)
{
if (!q->is_task)
return true;

q->yield_at = 0;
q->yielded = true;
q->tmo_msecs = get_time_in_usec() / 1000;
q->tmo_msecs += msecs > 0 ? msecs : 1;
check_heap_error(push_choice(q));
return false;
}

void do_yield_at(query *q, unsigned int time_in_ms)
{
q->yield_at = get_time_in_usec() / 1000;
q->yield_at += time_in_ms > 0 ? time_in_ms : 1;
}

void make_call(query *q, cell *tmp)
{
make_end(tmp);
Expand Down Expand Up @@ -87,20 +68,6 @@ static void init_queue(query *q)
static pl_idx queue_used(const query *q) { return q->qp[0]; }
static cell *get_queue(query *q) { return q->queue[0]; }

static cell *pop_queue(query *q)
{
if (!q->qp[0])
return NULL;

cell *c = q->queue[0] + q->popp;
q->popp += c->nbr_cells;

if (q->popp == q->qp[0])
q->popp = q->qp[0] = 0;

return c;
}

bool make_slice(query *q, cell *d, const cell *orig, size_t off, size_t n)
{
if (!n) {
Expand Down Expand Up @@ -4541,252 +4508,6 @@ static bool bif_is_stream_1(query *q)
return is_stream(p1);
}

static void push_task(query *q, query *task)
{
task->next = q->tasks;

if (q->tasks)
q->tasks->prev = task;

q->tasks = task;
}

static query *pop_task(query *q, query *task)
{
if (task->prev)
task->prev->next = task->next;

if (task->next)
task->next->prev = task->prev;

if (task == q->tasks)
q->tasks = task->next;

return task->next;
}

static bool bif_end_wait_0(query *q)
{
if (q->parent)
q->parent->end_wait = true;

return true;
}

static bool bif_wait_0(query *q)
{
while (q->tasks && !q->end_wait) {
CHECK_INTERRUPT();
uint64_t now = get_time_in_usec() / 1000;
query *task = q->tasks;
unsigned spawn_cnt = 0;
bool did_something = false;

while (task) {
CHECK_INTERRUPT();

if (task->spawned) {
spawn_cnt++;

if (spawn_cnt >= /*g_cpu_count*/64)
break;
}

if (task->tmo_msecs && !task->error) {
if (now <= task->tmo_msecs) {
task = task->next;
continue;
}

task->tmo_msecs = 0;
}

if (!task->yielded || !task->st.curr_cell || task->error) {
query *save = task;
task = pop_task(q, task);
query_destroy(save);
continue;
}

start(task);
task = task->next;
did_something = true;
}

if (!did_something)
msleep(1);
}

q->end_wait = false;
return true;
}

static bool bif_await_0(query *q)
{
while (q->tasks) {
CHECK_INTERRUPT();
pl_uint now = get_time_in_usec() / 1000;
query *task = q->tasks;
unsigned spawn_cnt = 0;
bool did_something = false;

while (task) {
CHECK_INTERRUPT();

if (task->spawned) {
spawn_cnt++;

if (spawn_cnt >= /*g_cpu_count*/64)
break;
}

if (task->tmo_msecs && !task->error) {
if (now <= task->tmo_msecs) {
task = task->next;
continue;
}

task->tmo_msecs = 0;
}

if (!task->yielded || !task->st.curr_cell || task->error) {
query *save = task;
task = pop_task(q, task);
query_destroy(save);
continue;
}

start(task);

if (!task->tmo_msecs && task->yielded) {
did_something = true;
break;
}
}

if (!did_something)
msleep(1);
else
break;
}

if (!q->tasks)
return false;

check_heap_error(push_choice(q));
return true;
}

static bool bif_yield_0(query *q)
{
if (q->retry)
return true;

return do_yield(q, 0);
}

static bool bif_task_n(query *q)
{
pl_idx save_hp = q->st.hp;
cell *p0 = deep_clone_to_heap(q, q->st.curr_cell, q->st.curr_frame);
GET_FIRST_RAW_ARG0(p1,callable,p0);
check_heap_error(init_tmp_heap(q));
check_heap_error(clone_to_tmp(q, p1));
unsigned arity = p1->arity;
unsigned args = 1;

while (args++ < q->st.curr_cell->arity) {
GET_NEXT_RAW_ARG(p2,any);
check_heap_error(append_to_tmp(q, p2));
arity++;
}

cell *tmp2 = get_tmp_heap(q, 0);
tmp2->nbr_cells = tmp_heap_used(q);
tmp2->arity = arity;
bool found = false;

if ((tmp2->match = search_predicate(q->st.m, tmp2, NULL)) != NULL) {
tmp2->flags &= ~FLAG_BUILTIN;
} else if ((tmp2->bif_ptr = get_builtin_term(q->st.m, tmp2, &found, NULL)), found) {
tmp2->flags |= FLAG_BUILTIN;
}

q->st.hp = save_hp;
cell *tmp = prepare_call(q, false, tmp2, q->st.curr_frame, 0);
query *task = query_create_task(q, tmp);
task->yielded = task->spawned = true;
push_task(q, task);
return true;
}

static bool bif_fork_0(query *q)
{
cell *curr_cell = q->st.curr_cell + q->st.curr_cell->nbr_cells;
query *task = query_create_task(q, curr_cell);
task->yielded = true;
push_task(q, task);
return false;
}

static bool bif_send_1(query *q)
{
GET_FIRST_ARG(p1,nonvar);
query *dstq = q->parent && !q->parent->done ? q->parent : q;
check_heap_error(init_tmp_heap(q));
cell *c = deep_clone_to_tmp(q, p1, p1_ctx);
check_heap_error(c);

for (pl_idx i = 0; i < c->nbr_cells; i++) {
cell *c2 = c + i;
share_cell(c2);
}

check_heap_error(alloc_on_queuen(dstq, 0, c));
q->yielded = true;
return true;
}

static bool bif_recv_1(query *q)
{
GET_FIRST_ARG(p1,any);

while (true) {
CHECK_INTERRUPT();
cell *c = pop_queue(q);
if (!c) break;

if (unify(q, p1, p1_ctx, c, q->st.curr_frame))
return true;

check_heap_error(alloc_on_queuen(q, 0, c));
}

return false;
}

static bool bif_sys_cancel_future_1(query *q)
{
GET_FIRST_ARG(p1,integer);
uint64_t future = get_smalluint(p1);

for (query *task = q->tasks; task; task = task->next) {
if (task->future == future) {
task->error = true;
break;
}
}

return true;
}

static bool bif_sys_set_future_1(query *q)
{
GET_FIRST_ARG(p1,integer);
q->future = get_smalluint(p1);
return true;
}

static bool bif_pid_1(query *q)
{
GET_FIRST_ARG(p1,var);
Expand Down Expand Up @@ -6722,6 +6443,16 @@ static void load_properties(module *m)
format_template(m, tmpbuf, sizeof(tmpbuf), ptr->name, ptr->arity, ptr, ptr->evaluable?true:false, true); SB_strcat(pr, tmpbuf);
}

for (const builtins *ptr = g_tasks_bifs; ptr->name; ptr++) {
sl_app(m->pl->biftab, ptr->name, ptr);
if (ptr->name[0] == '$') continue;
format_property(m, tmpbuf, sizeof(tmpbuf), ptr->name, ptr->arity, "built_in", ptr->evaluable?true:false); SB_strcat(pr, tmpbuf);
format_property(m, tmpbuf, sizeof(tmpbuf), ptr->name, ptr->arity, "static", ptr->evaluable?true:false); SB_strcat(pr, tmpbuf);
if (ptr->iso) { format_property(m, tmpbuf, sizeof(tmpbuf), ptr->name, ptr->arity, "iso", ptr->evaluable?true:false); SB_strcat(pr, tmpbuf); }
format_template(m, tmpbuf, sizeof(tmpbuf), ptr->name, ptr->arity, ptr, ptr->evaluable?true:false, false); SB_strcat(pr, tmpbuf);
format_template(m, tmpbuf, sizeof(tmpbuf), ptr->name, ptr->arity, ptr, ptr->evaluable?true:false, true); SB_strcat(pr, tmpbuf);
}

for (const builtins *ptr = g_ffi_bifs; ptr->name; ptr++) {
sl_app(m->pl->biftab, ptr->name, ptr);
if (ptr->name[0] == '$') continue;
Expand Down Expand Up @@ -7104,8 +6835,6 @@ builtins g_other_bifs[] =
{"$attributed_var", 1, bif_sys_attributed_var_1, "@variable", false, false, BLAH},
{"$first_non_octet", 2, bif_sys_first_non_octet_2, "+chars,-integer", false, false, BLAH},
{"$skip_max_list", 4, bif_sys_skip_max_list_4, "?integer,?integer?,?term,?term", false, false, BLAH},
{"$cancel_future", 1, bif_sys_cancel_future_1, "+integer", false, false, BLAH},
{"$set_future", 1, bif_sys_set_future_1, "+integer", false, false, BLAH},
{"$asserta", 2, bif_sys_asserta_2, "+term,+atom", true, false, BLAH},
{"$assertz", 2, bif_sys_assertz_2, "+term,+atom", true, false, BLAH},
{"$clause", 2, bif_sys_clause_2, "?term,?term", false, false, BLAH},
Expand All @@ -7115,22 +6844,5 @@ builtins g_other_bifs[] =
{"crypto_data_hash", 3, bif_crypto_data_hash_3, "?string,?string,?list", false, false, BLAH},
#endif

{"task", 1, bif_task_n, ":callable", false, false, BLAH},
{"task", 2, bif_task_n, ":callable,?term", false, false, BLAH},
{"task", 3, bif_task_n, ":callable,?term,?term", false, false, BLAH},
{"task", 4, bif_task_n, ":callable,?term,?term,?term", false, false, BLAH},
{"task", 5, bif_task_n, ":callable,?term,?term,?term,?term", false, false, BLAH},
{"task", 6, bif_task_n, ":callable,?term,?term,?term,?term,?term", false, false, BLAH},
{"task", 7, bif_task_n, ":callable,?term,?term,?term,?term,?term,?term", false, false, BLAH},
{"task", 8, bif_task_n, ":callable,?term,?term,?term,?term,?term,?term,?term", false, false, BLAH},

{"end_wait", 0, bif_end_wait_0, NULL, false, false, BLAH},
{"wait", 0, bif_wait_0, NULL, false, false, BLAH},
{"await", 0, bif_await_0, NULL, false, false, BLAH},
{"yield", 0, bif_yield_0, NULL, false, false, BLAH},
{"fork", 0, bif_fork_0, NULL, false, false, BLAH},
{"send", 1, bif_send_1, "+term", false, false, BLAH},
{"recv", 1, bif_recv_1, "?term", false, false, BLAH},

{0}
};
Loading

0 comments on commit 1a41db4

Please sign in to comment.