From 6ec4e085c60123a0722446a8756496a095f7891e Mon Sep 17 00:00:00 2001 From: Wenwen Chen Date: Thu, 20 Jun 2024 11:21:19 +0800 Subject: [PATCH 1/4] Persist AOF file by io_uring Signed-off-by: Wenwen Chen --- src/Makefile | 13 ++- src/aof.c | 8 +- src/config.c | 1 + src/config.h | 5 ++ src/io_uring.c | 167 +++++++++++++++++++++++++++++++++++ src/io_uring.h | 46 ++++++++++ src/server.c | 17 ++++ src/server.h | 1 + tests/unit/introspection.tcl | 1 + valkey.conf | 9 +- 10 files changed, 265 insertions(+), 3 deletions(-) create mode 100644 src/io_uring.c create mode 100644 src/io_uring.h diff --git a/src/Makefile b/src/Makefile index 18e5527eff..98ed18dcaa 100644 --- a/src/Makefile +++ b/src/Makefile @@ -302,6 +302,17 @@ ifeq ($(MALLOC),jemalloc) FINAL_LIBS := ../deps/jemalloc/lib/libjemalloc.a $(FINAL_LIBS) endif +# only Linux has IO_URING support +ifeq ($(uname_S),Linux) +HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include " > foo.c; \ + $(CC) -E foo.c > /dev/null 2>&1 && echo yes; \ + rm foo.c') +ifeq ($(HAS_LIBURING),yes) + FINAL_CFLAGS+= -DHAVE_IO_URING + FINAL_LIBS+= -luring +endif +endif + # LIBSSL & LIBCRYPTO LIBSSL_LIBS= LIBSSL_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libssl && echo $$?) @@ -401,7 +412,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/aof.c b/src/aof.c index 1a47d9c688..034af65026 100644 --- a/src/aof.c +++ b/src/aof.c @@ -31,6 +31,7 @@ #include "bio.h" #include "rio.h" #include "functions.h" +#include "io_uring.h" #include #include @@ -1007,8 +1008,13 @@ int startAppendOnly(void) { * true, and in general it looks just more resilient to retry the write. If * there is an actual error condition we'll get it at the next try. */ ssize_t aofWrite(int fd, const char *buf, size_t len) { - ssize_t nwritten = 0, totwritten = 0; + ssize_t totwritten = 0; + if (server.io_uring_enabled && getAofIOUring()) { + totwritten = aofWriteByIOUring(fd, buf, len); + return totwritten; + } + ssize_t nwritten = 0; while (len) { nwritten = write(fd, buf, len); diff --git a/src/config.c b/src/config.c index f8784413f9..c03dde8c7b 100644 --- a/src/config.c +++ b/src/config.c @@ -3069,6 +3069,7 @@ standardConfig static_configs[] = { createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL), createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat), createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), + createBoolConfig("io-uring-enabled", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/config.h b/src/config.h index 95c2e84a00..89f4861581 100644 --- a/src/config.h +++ b/src/config.h @@ -97,6 +97,11 @@ #define HAVE_EPOLL 1 #endif +/* Test for liburing API */ +#ifndef __linux__ +#define HAVE_IO_URING 0 +#endif + /* Test for accept4() */ #if defined(__linux__) || defined(__FreeBSD__) || defined(OpenBSD5_7) || \ (defined(__DragonFly__) && __DragonFly_version >= 400305) || \ diff --git a/src/io_uring.c b/src/io_uring.c new file mode 100644 index 0000000000..75d3de949d --- /dev/null +++ b/src/io_uring.c @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2009-2016, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary fors, with or without + * odification, are peritted provided that the following conditions are et: + * + * * Redistributions of source code ust retain the above copyright notice, + * this list of conditions and the following disclaier. + * * Redistributions in binary for ust reproduce the above copyright + * notice, this list of conditions and the following disclaier in the + * docuentation and/or other aterials provided with the distribution. + * * Neither the nae of Redis nor the naes of its contributors ay be used + * to endorse or proote products derived fro this software without + * specific prior written perission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "io_uring.h" + +#ifdef HAVE_IO_URING +#include +#include "zmalloc.h" + +/* AOF io_uring max QD and blocksize */ +#define AOF_IOURING_MAX_ENTRIES (64) +#define AOF_IOURING_MAX_BLOCKSIZE (32 * 1024) + +static struct io_uring *_aof_io_uring; +static struct iovec iov[AOF_IOURING_MAX_ENTRIES]; +static int inflight = 0; + +int initAofIOUring(void) { + _aof_io_uring = NULL; + struct io_uring *ring = zmalloc(sizeof(struct io_uring)); + if (!ring) { + fprintf(stderr, "failed to allocate memory for aof io_uring...\n"); + return -1; + } + + int ret = io_uring_queue_init(AOF_IOURING_MAX_ENTRIES, ring, 0); + if (ret != 0) { + fprintf(stderr, "failed to init queue of aof io_uring...\n"); + zfree(ring); + return -1; + } + + _aof_io_uring = ring; + return 0; +} + +void freeAofIOUring(void) { + if (_aof_io_uring) { + io_uring_queue_exit(_aof_io_uring); + zfree(_aof_io_uring); + _aof_io_uring = NULL; + } +} + +struct io_uring *getAofIOUring() { + if (!_aof_io_uring) { + initAofIOUring(); + } + return _aof_io_uring; +} + +static int prepWrite(int fd, struct iovec *iov, unsigned nr_vecs, unsigned offset) { + struct io_uring_sqe *sqe = io_uring_get_sqe(_aof_io_uring); + if (!sqe) return -1; + + io_uring_prep_writev(sqe, fd, iov, nr_vecs, offset); + io_uring_sqe_set_data(sqe, &_aof_io_uring); + return 0; +} + +static int reapCompletions(void) { + struct io_uring_cqe *cqes[inflight]; + int cqecnt = io_uring_peek_batch_cqe(_aof_io_uring, cqes, inflight); + if (cqecnt < 0) { + return -1; + } + io_uring_cq_advance(_aof_io_uring, cqecnt); + inflight -= cqecnt; + return 0; +} + +int aofWriteByIOUring(int fd, const char *buf, size_t len) { + ssize_t writing = 0; + ssize_t totwritten = 0; + + while (len) { + ssize_t offset = 0; + ssize_t this_size = 0; + int has_inflight = inflight; + + while (len && (inflight < AOF_IOURING_MAX_ENTRIES)) { + this_size = len; + if (this_size > AOF_IOURING_MAX_BLOCKSIZE) this_size = AOF_IOURING_MAX_BLOCKSIZE; + + iov[inflight].iov_base = ((char *)buf) + offset; + iov[inflight].iov_len = this_size; + if (0 != prepWrite(fd, &iov[inflight], 1, 0)) { + fprintf(stderr, "## prepWrite failed when persist AOF file by io_uring...\n"); + } + + len -= this_size; + offset += this_size; + writing += this_size; + inflight++; + } + + if (has_inflight != inflight) io_uring_submit(_aof_io_uring); + + int depth; + if (len) + depth = AOF_IOURING_MAX_ENTRIES; + else + depth = 1; + + while (inflight >= depth) { + if (0 != reapCompletions()) { + fprintf(stderr, "## reapCompletions failed when persist AOF file by io_uring...\n"); + return totwritten; + } + } + totwritten = writing; + } + + return totwritten; +} + +#else + +#ifndef UNUSED +#define UNUSED(V) ((void)V) +#endif + +int initAofIOUring(void) { + return 0; +} + +void freeAofIOUring(void) { + return; +} + +struct io_uring *getAofIOUring() { + return 0; +} + +int aofWriteByIOUring(int fd, const char *buf, size_t len) { + UNUSED(fd); + UNUSED(buf); + UNUSED(len); + return 0; +} +#endif /* end of HAVE_IO_URING */ diff --git a/src/io_uring.h b/src/io_uring.h new file mode 100644 index 0000000000..dfbde16a72 --- /dev/null +++ b/src/io_uring.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2009-2016, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary fors, with or without + * odification, are peritted provided that the following conditions are et: + * + * * Redistributions of source code ust retain the above copyright notice, + * this list of conditions and the following disclaier. + * * Redistributions in binary for ust reproduce the above copyright + * notice, this list of conditions and the following disclaier in the + * docuentation and/or other aterials provided with the distribution. + * * Neither the nae of Redis nor the naes of its contributors ay be used + * to endorse or proote products derived fro this software without + * specific prior written perission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef IO_URING_H +#define IO_URING_H +#include + +/* Initialize io_uring for AOF persistence at server startup + * if have io_uring configured, setup io_uring submission and completion. */ +int initAofIOUring(void); + +/* Free io_uring. */ +void freeAofIOUring(void); + +struct io_uring *getAofIOUring(void); + +/* Persist aof_buf to file by using io_uring. */ +int aofWriteByIOUring(int fd, const char *buf, size_t len); + +#endif /* IO_URING_H */ diff --git a/src/server.c b/src/server.c index 57456c6597..a75a5ecccc 100644 --- a/src/server.c +++ b/src/server.c @@ -39,6 +39,7 @@ #include "syscheck.h" #include "threads_mngr.h" #include "fmtargs.h" +#include "io_uring.h" #include #include @@ -2797,6 +2798,18 @@ void initListeners(void) { void InitServerLast(void) { bioInit(); initThreadedIO(); + if (server.io_uring_enabled) { +#ifdef HAVE_IO_URING + if (0 == initAofIOUring()) + serverLog(LL_NOTICE, "aof io_uring init successfully."); + else { + serverLog(LL_WARNING, "aof io_uring init failed."); + exit(1); + } +#else + serverLog(LL_WARNING, "System doesn't support io_uring, not init aof io_uring."); +#endif + } set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); } @@ -6984,6 +6997,10 @@ int main(int argc, char **argv) { aeMain(server.el); aeDeleteEventLoop(server.el); + if (server.io_uring_enabled) { + freeAofIOUring(); + serverLog(LL_NOTICE, "aof io_uring free successfully."); + } return 0; } diff --git a/src/server.h b/src/server.h index 73f68a73d4..e6c10bc9c7 100644 --- a/src/server.h +++ b/src/server.h @@ -1833,6 +1833,7 @@ struct valkeyServer { aofManifest *aof_manifest; /* Used to track AOFs. */ int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs? default no. (for testings). */ + int io_uring_enabled; /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index a12a3ba23d..90e5ceacb0 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -558,6 +558,7 @@ start_server {tags {"introspection"}} { socket-mark-id req-res-logfile client-default-resp + io-uring-enabled } if {!$::tls} { diff --git a/valkey.conf b/valkey.conf index e4ffd0f8ad..723f70d957 100644 --- a/valkey.conf +++ b/valkey.conf @@ -2324,4 +2324,11 @@ jemalloc-bg-thread yes # this is only exposed via the info command for clients to use, but in the future we # we may also use this when making decisions for replication. # -# availability-zone "zone-name" \ No newline at end of file +# availability-zone "zone-name" + +# Enable/disable io_uring to persist AOF file. +# The preconditions of enable io_uring are: +# 1. Valkey was build with liburing +# 2. The machine which host Valkey server has installed liburing +# +# io-uring-enabled no From 6c1370850405ac490917e9eff99805490822f0fe Mon Sep 17 00:00:00 2001 From: Wenwen Chen Date: Fri, 5 Jul 2024 15:36:10 +0800 Subject: [PATCH 2/4] fix clang-format Signed-off-by: Wenwen Chen --- src/io_uring.c | 4 ++-- src/server.c | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/io_uring.c b/src/io_uring.c index 75d3de949d..4226108c0d 100644 --- a/src/io_uring.c +++ b/src/io_uring.c @@ -34,7 +34,7 @@ #include "zmalloc.h" /* AOF io_uring max QD and blocksize */ -#define AOF_IOURING_MAX_ENTRIES (64) +#define AOF_IOURING_MAX_ENTRIES (64) #define AOF_IOURING_MAX_BLOCKSIZE (32 * 1024) static struct io_uring *_aof_io_uring; @@ -131,7 +131,7 @@ int aofWriteByIOUring(int fd, const char *buf, size_t len) { while (inflight >= depth) { if (0 != reapCompletions()) { fprintf(stderr, "## reapCompletions failed when persist AOF file by io_uring...\n"); - return totwritten; + return totwritten; } } totwritten = writing; diff --git a/src/server.c b/src/server.c index a75a5ecccc..256176b45f 100644 --- a/src/server.c +++ b/src/server.c @@ -2798,18 +2798,18 @@ void initListeners(void) { void InitServerLast(void) { bioInit(); initThreadedIO(); - if (server.io_uring_enabled) { + if (server.io_uring_enabled) { #ifdef HAVE_IO_URING - if (0 == initAofIOUring()) - serverLog(LL_NOTICE, "aof io_uring init successfully."); - else { - serverLog(LL_WARNING, "aof io_uring init failed."); - exit(1); - } + if (0 == initAofIOUring()) + serverLog(LL_NOTICE, "aof io_uring init successfully."); + else { + serverLog(LL_WARNING, "aof io_uring init failed."); + exit(1); + } #else - serverLog(LL_WARNING, "System doesn't support io_uring, not init aof io_uring."); + serverLog(LL_WARNING, "System doesn't support io_uring, not init aof io_uring."); #endif - } + } set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); } @@ -6997,10 +6997,10 @@ int main(int argc, char **argv) { aeMain(server.el); aeDeleteEventLoop(server.el); - if (server.io_uring_enabled) { - freeAofIOUring(); - serverLog(LL_NOTICE, "aof io_uring free successfully."); - } + if (server.io_uring_enabled) { + freeAofIOUring(); + serverLog(LL_NOTICE, "aof io_uring free successfully."); + } return 0; } From 0c6ed8cb34f839ee2f7f62a24c10839c5b1a038c Mon Sep 17 00:00:00 2001 From: Wenwen Chen Date: Fri, 5 Jul 2024 16:04:53 +0800 Subject: [PATCH 3/4] fix minor compiling warning and spelling error Signed-off-by: Wenwen Chen --- src/io_uring.c | 6 +++--- src/io_uring.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/io_uring.c b/src/io_uring.c index 4226108c0d..af3f35e6a6 100644 --- a/src/io_uring.c +++ b/src/io_uring.c @@ -9,7 +9,7 @@ * this list of conditions and the following disclaier. * * Redistributions in binary for ust reproduce the above copyright * notice, this list of conditions and the following disclaier in the - * docuentation and/or other aterials provided with the distribution. + * documentation and/or other aterials provided with the distribution. * * Neither the nae of Redis nor the naes of its contributors ay be used * to endorse or proote products derived fro this software without * specific prior written perission. @@ -68,7 +68,7 @@ void freeAofIOUring(void) { } } -struct io_uring *getAofIOUring() { +struct io_uring *getAofIOUring(void) { if (!_aof_io_uring) { initAofIOUring(); } @@ -154,7 +154,7 @@ void freeAofIOUring(void) { return; } -struct io_uring *getAofIOUring() { +struct io_uring *getAofIOUring(void) { return 0; } diff --git a/src/io_uring.h b/src/io_uring.h index dfbde16a72..f281ee27df 100644 --- a/src/io_uring.h +++ b/src/io_uring.h @@ -9,7 +9,7 @@ * this list of conditions and the following disclaier. * * Redistributions in binary for ust reproduce the above copyright * notice, this list of conditions and the following disclaier in the - * docuentation and/or other aterials provided with the distribution. + * documentation and/or other aterials provided with the distribution. * * Neither the nae of Redis nor the naes of its contributors ay be used * to endorse or proote products derived fro this software without * specific prior written perission. From 8cc8454f19a678f6f0a90d966d4ba88280eb6a6a Mon Sep 17 00:00:00 2001 From: Wenwen Chen Date: Thu, 1 Aug 2024 16:34:25 +0800 Subject: [PATCH 4/4] Reap io_uring completion one by one Signed-off-by: Wenwen Chen --- src/aof.c | 1 - src/io_uring.c | 140 +++++++++++++++++++++++++++++-------------------- src/io_uring.h | 5 +- 3 files changed, 85 insertions(+), 61 deletions(-) diff --git a/src/aof.c b/src/aof.c index 034af65026..5499beedb0 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1027,7 +1027,6 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) { buf += nwritten; totwritten += nwritten; } - return totwritten; } diff --git a/src/io_uring.c b/src/io_uring.c index af3f35e6a6..ce9011095b 100644 --- a/src/io_uring.c +++ b/src/io_uring.c @@ -1,6 +1,6 @@ /* - * Copyright (c) 2009-2016, Salvatore Sanfilippo - * All rights reserved. + * Copyright (c) 2024 Samsung Electronics Co., Ltd. All rights reserved. + * Author: Wenwen Chen * * Redistribution and use in source and binary fors, with or without * odification, are peritted provided that the following conditions are et: @@ -28,7 +28,7 @@ */ #include "io_uring.h" - +#include "server.h" #ifdef HAVE_IO_URING #include #include "zmalloc.h" @@ -37,26 +37,50 @@ #define AOF_IOURING_MAX_ENTRIES (64) #define AOF_IOURING_MAX_BLOCKSIZE (32 * 1024) +struct io_data { + struct iovec iov; + int used; +}; + +static struct io_data buffer[AOF_IOURING_MAX_ENTRIES]; static struct io_uring *_aof_io_uring; -static struct iovec iov[AOF_IOURING_MAX_ENTRIES]; -static int inflight = 0; + +static inline void initIoData(void) { + for (int index = 0; index < AOF_IOURING_MAX_ENTRIES; index++) { + buffer[index].used = 0; + } +} + +static inline struct io_data *getIoData(void) { + struct io_data *ret = NULL; + for (int index = 0; index < AOF_IOURING_MAX_ENTRIES; index++) { + if (0 == buffer[index].used) { + ret = &buffer[index]; + ret->used = 1; + return ret; + } + } + return ret; +} + +static inline void releaseIoData(struct io_data *data) { + if (NULL == data) return; + data->used = 0; +} int initAofIOUring(void) { _aof_io_uring = NULL; struct io_uring *ring = zmalloc(sizeof(struct io_uring)); - if (!ring) { - fprintf(stderr, "failed to allocate memory for aof io_uring...\n"); - return -1; - } + if (!ring) return -1; int ret = io_uring_queue_init(AOF_IOURING_MAX_ENTRIES, ring, 0); if (ret != 0) { - fprintf(stderr, "failed to init queue of aof io_uring...\n"); zfree(ring); return -1; } _aof_io_uring = ring; + initIoData(); return 0; } @@ -69,75 +93,77 @@ void freeAofIOUring(void) { } struct io_uring *getAofIOUring(void) { - if (!_aof_io_uring) { - initAofIOUring(); - } + if (!_aof_io_uring) initAofIOUring(); + return _aof_io_uring; } -static int prepWrite(int fd, struct iovec *iov, unsigned nr_vecs, unsigned offset) { +static int prepWrite(int fd, struct io_data *data, unsigned nr_vecs, unsigned offset) { struct io_uring_sqe *sqe = io_uring_get_sqe(_aof_io_uring); if (!sqe) return -1; - io_uring_prep_writev(sqe, fd, iov, nr_vecs, offset); - io_uring_sqe_set_data(sqe, &_aof_io_uring); + io_uring_prep_writev(sqe, fd, &data->iov, nr_vecs, offset); + io_uring_sqe_set_data(sqe, data); return 0; } -static int reapCompletions(void) { - struct io_uring_cqe *cqes[inflight]; - int cqecnt = io_uring_peek_batch_cqe(_aof_io_uring, cqes, inflight); - if (cqecnt < 0) { - return -1; - } - io_uring_cq_advance(_aof_io_uring, cqecnt); - inflight -= cqecnt; +static int reapCq(int *len) { + struct io_uring_cqe *cqe; + int ret = io_uring_wait_cqe(_aof_io_uring, &cqe); + if (ret < 0) return ret; + + struct io_data *data = io_uring_cqe_get_data(cqe); + releaseIoData(data); + *len = cqe->res; + io_uring_cqe_seen(_aof_io_uring, cqe); return 0; } int aofWriteByIOUring(int fd, const char *buf, size_t len) { - ssize_t writing = 0; - ssize_t totwritten = 0; - - while (len) { - ssize_t offset = 0; - ssize_t this_size = 0; - int has_inflight = inflight; - - while (len && (inflight < AOF_IOURING_MAX_ENTRIES)) { - this_size = len; + size_t submit_size = 0; + size_t complete_size = 0; + int submit_cnt = 0; + int complete_cnt = 0; + size_t write_left = len; + + while (write_left || (submit_cnt > complete_cnt)) { + size_t offset = 0; + size_t this_size = 0; + int has_submit = submit_cnt; + struct io_data *data = NULL; + + // Queue up as many writes as we can + while (write_left && ((submit_cnt - complete_cnt) < AOF_IOURING_MAX_ENTRIES)) { + this_size = write_left; if (this_size > AOF_IOURING_MAX_BLOCKSIZE) this_size = AOF_IOURING_MAX_BLOCKSIZE; - iov[inflight].iov_base = ((char *)buf) + offset; - iov[inflight].iov_len = this_size; - if (0 != prepWrite(fd, &iov[inflight], 1, 0)) { - fprintf(stderr, "## prepWrite failed when persist AOF file by io_uring...\n"); - } + data = getIoData(); + if (NULL == data) return complete_size; + + data->iov.iov_base = ((char *)buf) + offset; + data->iov.iov_len = this_size; + if (0 != prepWrite(fd, data, 1, 0)) return complete_size; - len -= this_size; + write_left -= this_size; offset += this_size; - writing += this_size; - inflight++; + submit_size += this_size; + submit_cnt++; } + if (has_submit != submit_cnt) { + int ret = io_uring_submit(_aof_io_uring); + if (ret < 0) return complete_size; + } + // Queue is full at this point. Find at least one completion. + while (complete_size < len) { + int cq_size = 0; + if (0 != reapCq(&cq_size)) break; - if (has_inflight != inflight) io_uring_submit(_aof_io_uring); - - int depth; - if (len) - depth = AOF_IOURING_MAX_ENTRIES; - else - depth = 1; - - while (inflight >= depth) { - if (0 != reapCompletions()) { - fprintf(stderr, "## reapCompletions failed when persist AOF file by io_uring...\n"); - return totwritten; - } + complete_size += cq_size; + complete_cnt++; } - totwritten = writing; } - return totwritten; + return complete_size; } #else diff --git a/src/io_uring.h b/src/io_uring.h index f281ee27df..96509b5e8f 100644 --- a/src/io_uring.h +++ b/src/io_uring.h @@ -1,7 +1,6 @@ /* - * Copyright (c) 2009-2016, Salvatore Sanfilippo - * All rights reserved. - * + * Copyright (c) 2024 Samsung Electronics Co., Ltd. All rights reserved. + * Author: Wenwen Chen * Redistribution and use in source and binary fors, with or without * odification, are peritted provided that the following conditions are et: *