Skip to content

Commit

Permalink
Merge pull request #673 from letFunny/snapshot-pool-integration
Browse files Browse the repository at this point in the history
Snapshot pool integration, first version
  • Loading branch information
cole-miller committed Jul 31, 2024
2 parents eb777c2 + defc3f3 commit 05ae53f
Show file tree
Hide file tree
Showing 7 changed files with 608 additions and 110 deletions.
4 changes: 4 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ libraft_la_LDFLAGS = $(UV_LIBS)
raft_core_unit_test_SOURCES = \
$(libraft_la_SOURCES) \
src/lib/sm.c \
src/lib/threadpool.c \
src/tracing.c \
test/raft/unit/main_core.c \
test/raft/unit/test_byte.c \
Expand All @@ -238,6 +239,7 @@ raft_core_unit_test_LDADD = libtest.la
raft_core_integration_test_SOURCES = \
src/tracing.c \
src/lib/sm.c \
src/lib/threadpool.c \
test/raft/integration/main_core.c \
test/raft/integration/test_apply.c \
test/raft/integration/test_assign.c \
Expand All @@ -263,6 +265,7 @@ raft_core_integration_test_LDADD = libtest.la libraft.la

raft_core_fuzzy_test_SOURCES = \
src/lib/sm.c \
src/lib/threadpool.c \
src/tracing.c \
test/raft/fuzzy/main_core.c \
test/raft/fuzzy/test_election.c \
Expand Down Expand Up @@ -295,6 +298,7 @@ raft_uv_integration_test_SOURCES = \
$(libraft_la_SOURCES) \
src/tracing.c \
src/lib/sm.c \
src/lib/threadpool.c \
test/raft/integration/main_uv.c \
test/raft/integration/test_uv_init.c \
test/raft/integration/test_uv_append.c \
Expand Down
30 changes: 29 additions & 1 deletion src/lib/threadpool.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include "threadpool.h"
#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include <uv/unix.h>
#include "../../src/lib/queue.h"
#include "../../src/lib/sm.h"
#include "../../src/utils.h"
#include "../tracing.h"

/**
* Planner thread state machine.
Expand Down Expand Up @@ -74,6 +76,9 @@ static const struct sm_conf planner_states[PS_NR] = {
},
};

static const uint64_t pool_thread_magic = 0xf344e2;
static uv_key_t thread_identifier_key;

enum {
THREADPOOL_SIZE_MAX = 1024,
};
Expand Down Expand Up @@ -125,6 +130,14 @@ struct pool_impl {
};
/* clang-format on */

/* Callback does not allow passing data, we use a static variable to report
* errors back. */
static int thread_key_create_err = 0;
static void thread_key_create(void) {
PRE(thread_key_create_err == 0);
thread_key_create_err = uv_key_create(&thread_identifier_key);
}

static inline bool pool_is_inited(const pool_t *pool)
{
return pool->pi != NULL;
Expand Down Expand Up @@ -325,6 +338,7 @@ static void worker(void *arg)
pool_work_t *w;
queue *q;

uv_key_set(&thread_identifier_key, (void*)pool_thread_magic);
uv_sem_post(ta->sem);
uv_mutex_lock(mutex);
for (;;) {
Expand Down Expand Up @@ -552,7 +566,17 @@ int pool_init(pool_t *pool,
return rc;
}

static uv_once_t once = UV_ONCE_INIT;
uv_once(&once, thread_key_create);
if (thread_key_create_err != 0) {
uv_close((uv_handle_t *)&pi->outq_async, NULL);
uv_mutex_destroy(&pi->outq_mutex);
free(pi);
return thread_key_create_err;
}

pool_threads_init(pool);

return 0;
}

Expand Down Expand Up @@ -581,6 +605,10 @@ void pool_close(pool_t *pool)
uv_mutex_unlock(&pi->mutex);
}

bool pool_is_pool_thread(void) {
return uv_key_get(&thread_identifier_key) == (void*)pool_thread_magic;
}

pool_t *pool_ut_fallback(void)
{
static pool_t pool;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define __THREAD_POOL__

#include <uv.h>
#include <stdbool.h>
#include "queue.h"

/**
Expand Down Expand Up @@ -111,6 +112,7 @@ void pool_queue_work(pool_t *pool,
enum pool_work_type type,
void (*work_cb)(pool_work_t *w),
void (*after_work_cb)(pool_work_t *w));
bool pool_is_pool_thread(void);

pool_t *pool_ut_fallback(void);

Expand Down
2 changes: 2 additions & 0 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ struct raft_signature {
struct page_from_to page_from_to;
pageno_t cs_page_no;
enum raft_result result;
bool ask_calculated;
};
#define RAFT_SIGNATURE_VERSION 0

Expand All @@ -390,6 +391,7 @@ struct raft_signature_result {
unsigned int cs_nr;
pageno_t cs_page_no;
enum raft_result result;
bool calculated;
};
#define RAFT_SIGNATURE_RESULT_VERSION 0

Expand Down
Loading

0 comments on commit 05ae53f

Please sign in to comment.