Skip to content

Commit

Permalink
latest version
Browse files Browse the repository at this point in the history
  • Loading branch information
kassane committed Sep 29, 2023
1 parent 78bcf22 commit c5c5eca
Show file tree
Hide file tree
Showing 13 changed files with 289 additions and 160 deletions.
62 changes: 62 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: Benchmarking

on:
push:
pull_request:
schedule:
- cron: '13 8 * * 0'

jobs:
build:
strategy:
fail-fast: false
matrix:
runs-on: [ubuntu-latest]
runs-on: ${{ matrix.runs-on }}
steps:
- uses: actions/checkout@v3
with:
submodules: recursive
fetch-depth: 0
- uses: goto-bus-stop/setup-zig@v2
- uses: lukka/get-cmake@latest
with:
cmakeVersion: latest

- name: Tigerbeetle Installation
run: |
chmod +x $PWD/scripts/install.sh;
chmod +x $PWD/scripts/run_tigerbeetle.sh;
chmod +x $PWD/zig/run.sh;
$PWD/scripts/install.sh;
$PWD/scripts/run_tigerbeetle.sh
- name: Run - C Client
run: |
cd c
$PWD/run.sh
cd ..
## need refactoring
# - name: Run - Go Client
# run: |
# cd go
# $PWD/run.sh
# cd ..
# - name: Run - .Net Client
# run: |
# cd dotnet
# $PWD/run.sh
# cd ..
# - name: Run - Java Client
# run: |
# cd java
# $PWD/run.sh
# cd ..
- name: Run - Zig Client
run: |
cd zig
$PWD/run.sh
cd ..
- name: Kill TB Process
run: pkill -f tigerbeetle
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ c/zig-cache
c/zig-out
zig/zig-cache
zig/zig-out
tigerbeetle/
build/
33 changes: 33 additions & 0 deletions c/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
cmake_minimum_required(VERSION 3.14)
project(bench LANGUAGES C)

include(FetchContent)


find_package(tigerbeetle 0.3.0 QUIET)
if(NOT tigerbeetle_FOUND)
FetchContent_Declare(tigerbeetle GIT_REPOSITORY https://github.com/kassane/tigerbeetle-cpp.git
GIT_TAG main)
FetchContent_GetProperties(tigerbeetle)
set(APP_TARGETS ${PROJECT_NAME})
FetchContent_MakeAvailable(tigerbeetle)
endif()
find_package(Threads REQUIRED)

add_executable(${PROJECT_NAME} bench.c)
# Link the executable target with the tigerbeetle client library and Threads library
target_link_libraries(${PROJECT_NAME}
PRIVATE tb_client Threads::Threads
)
# Include the tigerbeetle headers
target_include_directories(${PROJECT_NAME} PUBLIC ${tigerbeetle_SOURCE_DIR}/include)
# Link against the tigerbeetle library directory
target_link_directories(${PROJECT_NAME} PUBLIC ${tigerbeetle_BINARY_DIR})
# Add a custom target to run the executable
add_custom_target(run
COMMAND ${PROJECT_NAME}
DEPENDS ${PROJECT_NAME}
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
)
# Set the "bench" target as the default target when building the project
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY VS_STARTUP_PROJECT ${PROJECT_NAME})
245 changes: 129 additions & 116 deletions c/bench.c
Original file line number Diff line number Diff line change
@@ -1,139 +1,152 @@
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/time.h>
#include "tb_client.h"
#include <tb_client.h>

// Synchronization context between the callback and the main thread.
typedef struct completion_context {
void* reply;
int size;
pthread_mutex_t lock;
pthread_cond_t cv;
void *reply;
int size;
pthread_mutex_t lock;
pthread_cond_t cv;

} completion_context_t;

// Completion function, called by tb_client no notify that a request as completed.
void on_completion(uintptr_t context, tb_client_t client, tb_packet_t* packet, const uint8_t* data, uint32_t size) {

// The user_data gives context to a request:
completion_context_t* ctx = (completion_context_t*)packet->user_data;

// Signaling the main thread we received the reply:
pthread_mutex_lock(&ctx->lock);
ctx->size = size;
ctx->reply = (void*)data;
pthread_cond_signal(&ctx->cv);
pthread_mutex_unlock(&ctx->lock);
// Completion function, called by tb_client no notify that a request as
// completed.
void on_completion(uintptr_t context, tb_client_t client, tb_packet_t *packet,
const uint8_t *data, uint32_t size) {

// The user_data gives context to a request:
completion_context_t *ctx = (completion_context_t *)packet->user_data;

// Signaling the main thread we received the reply:
pthread_mutex_lock(&ctx->lock);
ctx->size = size;
ctx->reply = (void *)data;
pthread_cond_signal(&ctx->cv);
pthread_mutex_unlock(&ctx->lock);
}

// For benchmarking purposes.
long long get_time_ms(void) {
struct timeval tv;
gettimeofday(&tv,NULL);
return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
struct timeval tv;
gettimeofday(&tv, NULL);
return (((long long)tv.tv_sec) * 1000) + (tv.tv_usec / 1000);
}

int main(int argc, char **argv) {
tb_client_t client;
tb_packet_list_t packets;
const char* address = "127.0.0.1:3000";
TB_STATUS status = tb_client_init(
&client, // Output client.
&packets, // Output packet list.
0, // Cluster ID.
address, // Cluster addresses.
strlen(address), //
1, // MaxConcurrency == 1, this is a single-threaded program
NULL, // No need for a global context.
&on_completion // Completion callback.
);

if (status != TB_STATUS_SUCCESS) {
printf("Failed to initialize tb_client\n");
tb_client_t client;
tb_packet_t *packet;
const char *address = "127.0.0.1:3000";
TB_STATUS status = tb_client_init(
&client, // Output client.
0, // Cluster ID.
address, // Cluster addresses.
strlen(address), //
1, // MaxConcurrency == 1, this is a single-threaded program
NULL, // No need for a global context.
&on_completion // Completion callback.
);

if (status != TB_STATUS_SUCCESS) {
printf("Failed to initialize tb_client\n");
exit(-1);
}

// Initializing the mutex and condvar
completion_context_t ctx;

if (pthread_mutex_init(&ctx.lock, NULL) != 0) {
printf("Failed to initialize mutex\n");
exit(-1);
}

if (pthread_cond_init(&ctx.cv, NULL)) {
printf("Failed to initialize condition\n");
exit(-1);
}

pthread_mutex_lock(&ctx.lock);

const int SAMPLES = 1000000;
const int BATCH_SIZE = 8191;

// Repeat the same test 10 times and pick the best execution
for (int tries = 0; tries < 10; tries++) {

long max_latency_ms = 0;
long total_time_ms = 0;

for (int i = 0; i < SAMPLES; i += BATCH_SIZE) {

tb_transfer_t batch[BATCH_SIZE];
memset(&batch, 0, BATCH_SIZE);

for (int j = 0; (j < BATCH_SIZE) && (i + j < SAMPLES); j++) {
batch[j].id = 0;
batch[j].debit_account_id = 0;
batch[j].credit_account_id = 0;
batch[j].code = 1;
batch[j].ledger = 1;
batch[j].amount = 10;
}

// Acquiring a packet for this request:
if (tb_client_acquire_packet(client, &packet) != TB_PACKET_ACQUIRE_OK) {
printf("Too many concurrent packets\n");
exit(-1);
}
}

// Initializing the mutex and condvar
completion_context_t ctx;

if (pthread_mutex_init(&ctx.lock, NULL) != 0) {
printf("Failed to initialize mutex\n");
exit(-1);
}
packet->operation =
TB_OPERATION_CREATE_TRANSFERS; // The operation to be performed.
packet->data = &batch; // The data to be sent.
packet->data_size = BATCH_SIZE * sizeof(tb_transfer_t);
packet->user_data = &ctx; // User-defined context.
packet->status = TB_PACKET_OK; // Will be set when the reply arrives.

if (pthread_cond_init(&ctx.cv, NULL)) {
printf("Failed to initialize condition\n");
exit(-1);
}
long long now = get_time_ms();

tb_client_submit(client, packet);
pthread_cond_wait(&ctx.cv, &ctx.lock);

pthread_mutex_lock(&ctx.lock);

const int SAMPLES = 1000000;
const int BATCH_SIZE = 8191;

// Repeat the same test 10 times and pick the best execution
for (int tries = 0; tries < 10; tries++) {

long max_latency_ms = 0;
long total_time_ms = 0;

for (int i = 0; i < SAMPLES; i += BATCH_SIZE) {

tb_transfer_t batch[BATCH_SIZE];
memset(&batch, 0, BATCH_SIZE);

for (int j = 0; (j < BATCH_SIZE) && (i + j < SAMPLES); j++) {
batch[j].id = 0;
batch[j].debit_account_id = 0;
batch[j].credit_account_id = 0;
batch[j].code = 1;
batch[j].ledger = 1;
batch[j].amount = 10;
}

packets.head->operation = TB_OPERATION_CREATE_TRANSFERS; // The operation to be performed.
packets.head->data = &batch; // The data to be sent.
packets.head->data_size = BATCH_SIZE * sizeof(tb_transfer_t);
packets.head->user_data = &ctx; // User-defined context.
packets.head->status = TB_PACKET_OK; // Will be set when the reply arrives.

long long now = get_time_ms();

tb_client_submit(client, &packets);
pthread_cond_wait(&ctx.cv, &ctx.lock);

long elapsed_ms = get_time_ms() - now;

if (elapsed_ms > max_latency_ms) max_latency_ms = elapsed_ms;
total_time_ms += elapsed_ms;

if (packets.head->status != TB_PACKET_OK) {
// Checking if the request failed:
printf("Error calling create_transfers (ret=%d)\n", packets.head->status);
exit(-1);
}

// Since we are using invalid IDs,
// it is expected to all transfers to be rejected.
tb_create_transfers_result_t* results = (tb_create_transfers_result_t*)ctx.reply;
int results_len = ctx.size / sizeof(tb_create_transfers_result_t);
if (results_len != BATCH_SIZE) {
printf("Unexpected result %d\n", i);
exit(-1);
}
}

printf("Total time: %d ms\n", total_time_ms);
printf("Max time per batch: %d ms\n", max_latency_ms);
printf("Transfers per second: %d\n", SAMPLES * 1000 / total_time_ms);
printf("\n");
long elapsed_ms = get_time_ms() - now;

if (elapsed_ms > max_latency_ms)
max_latency_ms = elapsed_ms;
total_time_ms += elapsed_ms;

if (packet->status != TB_PACKET_OK) {
// Checking if the request failed:
printf("Error calling create_transfers (ret=%d)\n", packet->status);
exit(-1);
}

// Releasing the packet, so it can be used in a next request.
tb_client_release_packet(client, packet);

// Since we are using invalid IDs,
// it is expected to all transfers to be rejected.
tb_create_transfers_result_t *results =
(tb_create_transfers_result_t *)ctx.reply;
int results_len = ctx.size / sizeof(tb_create_transfers_result_t);
if (results_len != BATCH_SIZE) {
printf("Unexpected result %d\n", i);
exit(-1);
}
}

// Cleanup
pthread_mutex_unlock(&ctx.lock);
pthread_cond_destroy(&ctx.cv);
pthread_mutex_destroy(&ctx.lock);
tb_client_deinit(client);
printf("Total time: %d ms\n", total_time_ms);
printf("Max time per batch: %d ms\n", max_latency_ms);
printf("Transfers per second: %d\n", SAMPLES * 1000 / total_time_ms);
printf("\n");
}

// Cleanup
pthread_mutex_unlock(&ctx.lock);
pthread_cond_destroy(&ctx.cv);
pthread_mutex_destroy(&ctx.lock);
tb_client_deinit(client);
}
7 changes: 6 additions & 1 deletion c/run.sh
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
../tigerbeetle/zig/zig build run -Drelease-safe
#!/usr/bin/env bash

# zig build run -Doptimize=ReleaseSafe
cmake -B build -DCMAKE_BUILD_TYPE=Release
cmake --build build --target run

2 changes: 1 addition & 1 deletion go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module tigerbeetle-bench

go 1.17

require github.com/tigerbeetledb/tigerbeetle-go v0.0.0-20230515174739-c787f1046cd0
require github.com/tigerbeetledb/tigerbeetle-go v0.13.57
4 changes: 2 additions & 2 deletions go/go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
github.com/tigerbeetledb/tigerbeetle-go v0.0.0-20230515174739-c787f1046cd0 h1:5ygk+xhJ2p03J+viEZJ6GwKgw5BfQ2Dm3W/Q2h/063M=
github.com/tigerbeetledb/tigerbeetle-go v0.0.0-20230515174739-c787f1046cd0/go.mod h1:3Tjdd459YDuWJM1mHqafj/6r1lMXkqAJJGrhZ6Hsrv8=
github.com/tigerbeetledb/tigerbeetle-go v0.13.57 h1:Sn5SUXGXbE3mm5bzJZ+sQ7tq5Os8EoUcKx2nmzllXiQ=
github.com/tigerbeetledb/tigerbeetle-go v0.13.57/go.mod h1:3Tjdd459YDuWJM1mHqafj/6r1lMXkqAJJGrhZ6Hsrv8=
Loading

0 comments on commit c5c5eca

Please sign in to comment.