Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist AOF file by io_uring #750

Open
wants to merge 5 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ ifeq ($(MALLOC),jemalloc)
FINAL_LIBS := ../deps/jemalloc/lib/libjemalloc.a $(FINAL_LIBS)
endif

# only Linux has IO_URING support
ifeq ($(uname_S),Linux)
HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include <liburing.h>" > foo.c; \
$(CC) -E foo.c > /dev/null 2>&1 && echo yes; \
rm foo.c')
ifeq ($(HAS_LIBURING),yes)
FINAL_CFLAGS+= -DHAVE_IO_URING
FINAL_LIBS+= -luring
endif
endif

# LIBSSL & LIBCRYPTO
LIBSSL_LIBS=
LIBSSL_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libssl && echo $$?)
Expand Down Expand Up @@ -423,7 +434,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
9 changes: 7 additions & 2 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "bio.h"
#include "rio.h"
#include "functions.h"
#include "io_uring.h"

#include <signal.h>
#include <fcntl.h>
Expand Down Expand Up @@ -1012,8 +1013,13 @@ int startAppendOnly(void) {
* true, and in general it looks just more resilient to retry the write. If
* there is an actual error condition we'll get it at the next try. */
ssize_t aofWrite(int fd, const char *buf, size_t len) {
ssize_t nwritten = 0, totwritten = 0;
ssize_t totwritten = 0;
if (server.io_uring_enabled && getAofIOUring()) {
totwritten = aofWriteByIOUring(fd, buf, len);
return totwritten;
}

ssize_t nwritten = 0;
while (len) {
nwritten = write(fd, buf, len);

Expand All @@ -1026,7 +1032,6 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
buf += nwritten;
totwritten += nwritten;
}

return totwritten;
}

Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3105,6 +3105,7 @@ standardConfig static_configs[] = {
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("io-uring-enabled", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),

/* String Configs */
Expand Down
5 changes: 5 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@
#define HAVE_EPOLL 1
#endif

/* Test for liburing API */
#ifndef __linux__
#define HAVE_IO_URING 0
#endif

/* Test for accept4() */
#if defined(__linux__) || defined(__FreeBSD__) || defined(OpenBSD5_7) || \
(defined(__DragonFly__) && __DragonFly_version >= 400305) || \
Expand Down
193 changes: 193 additions & 0 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright (c) 2024 Samsung Electronics Co., Ltd. All rights reserved.
* Author: Wenwen Chen <Wenwen.chen@samsung.com>
*
* Redistribution and use in source and binary fors, with or without
* odification, are peritted provided that the following conditions are et:
*
* * Redistributions of source code ust retain the above copyright notice,
* this list of conditions and the following disclaier.
* * Redistributions in binary for ust reproduce the above copyright
* notice, this list of conditions and the following disclaier in the
* documentation and/or other aterials provided with the distribution.
* * Neither the nae of Redis nor the naes of its contributors ay be used
* to endorse or proote products derived fro this software without
* specific prior written perission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "io_uring.h"
#include "server.h"
#ifdef HAVE_IO_URING
#include <liburing.h>
#include "zmalloc.h"

/* AOF io_uring max QD and blocksize */
#define AOF_IOURING_MAX_ENTRIES (64)
#define AOF_IOURING_MAX_BLOCKSIZE (32 * 1024)

struct io_data {
struct iovec iov;
int used;
};

static struct io_data buffer[AOF_IOURING_MAX_ENTRIES];
static struct io_uring *_aof_io_uring;

static inline void initIoData(void) {
for (int index = 0; index < AOF_IOURING_MAX_ENTRIES; index++) {
buffer[index].used = 0;
}
}

static inline struct io_data *getIoData(void) {
struct io_data *ret = NULL;
for (int index = 0; index < AOF_IOURING_MAX_ENTRIES; index++) {
if (0 == buffer[index].used) {
ret = &buffer[index];
ret->used = 1;
return ret;
}
}
return ret;
}

static inline void releaseIoData(struct io_data *data) {
if (NULL == data) return;
data->used = 0;
}

int initAofIOUring(void) {
_aof_io_uring = NULL;
struct io_uring *ring = zmalloc(sizeof(struct io_uring));
if (!ring) return -1;

int ret = io_uring_queue_init(AOF_IOURING_MAX_ENTRIES, ring, 0);
if (ret != 0) {
zfree(ring);
return -1;
}

_aof_io_uring = ring;
initIoData();
return 0;
}

void freeAofIOUring(void) {
if (_aof_io_uring) {
io_uring_queue_exit(_aof_io_uring);
zfree(_aof_io_uring);
_aof_io_uring = NULL;
}
}

struct io_uring *getAofIOUring(void) {
if (!_aof_io_uring) initAofIOUring();

return _aof_io_uring;
}

static int prepWrite(int fd, struct io_data *data, unsigned nr_vecs, unsigned offset) {
struct io_uring_sqe *sqe = io_uring_get_sqe(_aof_io_uring);
if (!sqe) return -1;

io_uring_prep_writev(sqe, fd, &data->iov, nr_vecs, offset);
io_uring_sqe_set_data(sqe, data);
return 0;
}

static int reapCq(int *len) {
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(_aof_io_uring, &cqe);
if (ret < 0) return ret;

struct io_data *data = io_uring_cqe_get_data(cqe);
releaseIoData(data);
*len = cqe->res;
io_uring_cqe_seen(_aof_io_uring, cqe);
return 0;
}

int aofWriteByIOUring(int fd, const char *buf, size_t len) {
size_t submit_size = 0;
size_t complete_size = 0;
int submit_cnt = 0;
int complete_cnt = 0;
size_t write_left = len;

while (write_left || (submit_cnt > complete_cnt)) {
size_t offset = 0;
size_t this_size = 0;
int has_submit = submit_cnt;
struct io_data *data = NULL;

// Queue up as many writes as we can
while (write_left && ((submit_cnt - complete_cnt) < AOF_IOURING_MAX_ENTRIES)) {
this_size = write_left;
if (this_size > AOF_IOURING_MAX_BLOCKSIZE) this_size = AOF_IOURING_MAX_BLOCKSIZE;

data = getIoData();
if (NULL == data) return complete_size;

data->iov.iov_base = ((char *)buf) + offset;
data->iov.iov_len = this_size;
if (0 != prepWrite(fd, data, 1, 0)) return complete_size;

write_left -= this_size;
offset += this_size;
submit_size += this_size;
submit_cnt++;
}
if (has_submit != submit_cnt) {
int ret = io_uring_submit(_aof_io_uring);
if (ret < 0) return complete_size;
}
// Queue is full at this point. Find at least one completion.
while (complete_size < len) {
int cq_size = 0;
if (0 != reapCq(&cq_size)) break;

complete_size += cq_size;
complete_cnt++;
}
}

return complete_size;
}

#else

#ifndef UNUSED
#define UNUSED(V) ((void)V)
#endif

int initAofIOUring(void) {
return 0;
}

void freeAofIOUring(void) {
return;
}

struct io_uring *getAofIOUring(void) {
return 0;
}

int aofWriteByIOUring(int fd, const char *buf, size_t len) {
UNUSED(fd);
UNUSED(buf);
UNUSED(len);
return 0;
}
#endif /* end of HAVE_IO_URING */
45 changes: 45 additions & 0 deletions src/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024 Samsung Electronics Co., Ltd. All rights reserved.
* Author: Wenwen Chen <Wenwen.chen@samsung.com>
* Redistribution and use in source and binary fors, with or without
* odification, are peritted provided that the following conditions are et:
*
* * Redistributions of source code ust retain the above copyright notice,
* this list of conditions and the following disclaier.
* * Redistributions in binary for ust reproduce the above copyright
* notice, this list of conditions and the following disclaier in the
* documentation and/or other aterials provided with the distribution.
* * Neither the nae of Redis nor the naes of its contributors ay be used
* to endorse or proote products derived fro this software without
* specific prior written perission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef IO_URING_H
#define IO_URING_H
#include <stddef.h>

/* Initialize io_uring for AOF persistence at server startup
* if have io_uring configured, setup io_uring submission and completion. */
int initAofIOUring(void);

/* Free io_uring. */
void freeAofIOUring(void);

struct io_uring *getAofIOUring(void);

/* Persist aof_buf to file by using io_uring. */
int aofWriteByIOUring(int fd, const char *buf, size_t len);

#endif /* IO_URING_H */
17 changes: 17 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "syscheck.h"
#include "threads_mngr.h"
#include "fmtargs.h"
#include "io_uring.h"
#include "io_threads.h"

#include <time.h>
Expand Down Expand Up @@ -2821,6 +2822,18 @@ void initListeners(void) {
void InitServerLast(void) {
bioInit();
initIOThreads();
if (server.io_uring_enabled) {
#ifdef HAVE_IO_URING
if (0 == initAofIOUring())
serverLog(LL_NOTICE, "aof io_uring init successfully.");
else {
serverLog(LL_WARNING, "aof io_uring init failed.");
exit(1);
}
#else
serverLog(LL_WARNING, "System doesn't support io_uring, not init aof io_uring.");
#endif
}
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Expand Down Expand Up @@ -6968,6 +6981,10 @@ int main(int argc, char **argv) {

aeMain(server.el);
aeDeleteEventLoop(server.el);
if (server.io_uring_enabled) {
freeAofIOUring();
serverLog(LL_NOTICE, "aof io_uring free successfully.");
}
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,7 @@ struct valkeyServer {
aofManifest *aof_manifest; /* Used to track AOFs. */
int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs?
default no. (for testings). */
int io_uring_enabled;

/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ start_server {tags {"introspection"}} {
req-res-logfile
client-default-resp
dual-channel-replication-enabled
io-uring-enabled
}

if {!$::tls} {
Expand Down
8 changes: 8 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2363,3 +2363,11 @@ jemalloc-bg-thread yes
# we may also use this when making decisions for replication.
#
# availability-zone "zone-name"

# Enable/disable io_uring to persist AOF file.
# The preconditions of enable io_uring are:
# 1. Valkey was build with liburing
# 2. The machine which host Valkey server has installed liburing
#
# io-uring-enabled no

Loading