diff --git a/README.md b/README.md index ee3da7c..ee04bd7 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # librdb -C library for parsing RDB files. +This is C library for parsing RDB files. The Parser is implemented in the spirit of SAX parser. It fires off a series of events as it reads the RDB file from beginning to end, and callbacks to handlers registered on @@ -123,9 +123,9 @@ Level0. The same goes between Level2 and Level1 correspondingly. ### Handlers The **Handlers** represent a set of builtin or user-defined functions that will be called on the parsed data. Future plan to support built-in Handlers: -* Convert RDB to JSON file handlers. (Status: WIP) -* Convert RDB to RESP protocol handlers. (Status: WIP) -* Memory Analyze (Status: Todo) +* Convert RDB to JSON file handlers. +* Convert RDB to RESP protocol handlers. +* Memory Analyze (TODO) It is possible to attach to parser more than one set of handlers at the same level. That is, for a given data at a given level, the parser will call each of the handlers that diff --git a/api/librdb-api.h b/api/librdb-api.h index 704a4ed..5fb910d 100644 --- a/api/librdb-api.h +++ b/api/librdb-api.h @@ -408,7 +408,7 @@ _LIBRDB_API RdbState RDB_getState(RdbParser *p); /* get number of handlers registered at given level */ _LIBRDB_API int RDB_getNumHandlers(RdbParser *p, RdbHandlersLevel lvl); -/* set the parser to ignore checksum errors */ +/* To ignore on checksum error. Else parser return RDB_ERR_CHECKSUM_FAILURE */ _LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p); /* There could be relatively large strings stored within Redis, which are diff --git a/api/librdb-ext-api.h b/api/librdb-ext-api.h index 9bc24d3..3fecf32 100644 --- a/api/librdb-ext-api.h +++ b/api/librdb-ext-api.h @@ -139,7 +139,7 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *, RdbxToRespConf *) /**************************************************************** * RESP writer * - * Create instance for writing RDB to RESP stream. + * Interface to create writer instance for RDB to RESP stream. * * Imp by: RDBX_createRespToRedisTcp * RDBX_createRespToRedisFd @@ -147,12 +147,28 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *, RdbxToRespConf *) * ****************************************************************/ +/* On start command pass command info. NULL otherwise. */ +typedef struct RdbxRespWriterStartCmd { + /* Redis Command name (Ex: "SET", "RESTORE"). Owned by the caller. It is + * constant static string and Valid for ref behind the duration of the call. */ + const char *cmd; + /* If key available as part of command. Else empty string. + * Owned by the caller. */ + const char *key; +} RdbxRespWriterStartCmd; + typedef struct RdbxRespWriter { void *ctx; void (*delete)(void *ctx); /* return 0 on success. Otherwise 1 */ - int (*writev) (void *ctx, struct iovec *ioVec, int iovCnt, int startCmd, int endCmd); + int (*writev) (void *ctx, + struct iovec *ioVec, /* Standard C scatter/gather IO array */ + int iovCnt, /* Number of iovec elements */ + RdbxRespWriterStartCmd *startCmd, /* If start of RESP command then not NULL. Owned by + * the caller. Valid for the duration of the call. */ + int endCmd); /* 1, if this is end of RESP command, 0 otherwise */ + int (*flush) (void *ctx); } RdbxRespWriter; diff --git a/examples/example1.c b/examples/example1.c index b6d98a5..610e57d 100644 --- a/examples/example1.c +++ b/examples/example1.c @@ -1,7 +1,7 @@ /* The following C file serves as an illustration of how to use the librdb * library for transforming Redis RDB files into JSON format. If you wish to see * the various parsing components being invoked in the background, simply set - * the environment variable ENV_VAR_DEBUG_DATA to 1. + * the environment variable LIBRDB_DEBUG_DATA to 1. * * $ export LIBRDB_DEBUG_DATA=1 * $ make example diff --git a/src/ext/handlersToResp.c b/src/ext/handlersToResp.c index 68fa39d..868e6f1 100644 --- a/src/ext/handlersToResp.c +++ b/src/ext/handlersToResp.c @@ -58,8 +58,7 @@ struct RdbxToResp { size_t writeFromCmdNum; } debug; - /* Init to 3. Attempted to be released three times on termination */ - int refcount; + int refcount; /* intrusive refcount - Init to 2. Attempted to be released two times on termination */ RdbParser *parser; RdbxRespWriter respWriter; @@ -221,6 +220,10 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) { if (ctx->debug.flags & RFLAG_ENUM_CMD_ID) { char keyLenStr[32], cmdIdLenStr[32], cmdIdStr[32]; + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "SET"; + startCmd.key = KEY_CMD_ID_DBG; + struct iovec iov[7]; /* write SET */ IOV_CONST(&iov[0], "*3\r\n$3\r\nSET\r\n$"); @@ -230,7 +233,7 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) { /* write cmd-id */ IOV_CONST(&iov[3], "\r\n$"); IOV_LEN_AND_VAL(&iov[4], currCmdNum, cmdIdLenStr, cmdIdStr); - if (unlikely(writer->writev(writer->ctx, iov, 6, 1, 1))) { + if (unlikely(writer->writev(writer->ctx, iov, 6, &startCmd, 1))) { RdbRes errCode = RDB_getErrorCode(ctx->parser); /* If failed to write RESP writer but no error reported, then write some general error */ @@ -243,7 +246,8 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) { return RDB_OK; } -static inline RdbRes writevWrap(RdbxToResp *ctx, struct iovec *iov, int cnt, int startCmd, int endCmd) { +static inline RdbRes writevWrap(RdbxToResp *ctx, struct iovec *iov, int cnt, + RdbxRespWriterStartCmd *startCmd, int endCmd) { RdbRes res; RdbxRespWriter *writer = &ctx->respWriter; @@ -287,6 +291,11 @@ static inline RdbRes sendFirstRestoreFrag(RdbxToResp *ctx, RdbBulk frag, size_t if (ctx->keyCtx.delBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE) extra_args++; + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "RESTORE"; + startCmd.key = ctx->keyCtx.key; + + /* writev RESTORE */ char cmd[64]; int len = snprintf(cmd, sizeof(cmd), "*%d\r\n$7\r\nRESTORE", 4+extra_args); @@ -305,17 +314,23 @@ static inline RdbRes sendFirstRestoreFrag(RdbxToResp *ctx, RdbBulk frag, size_t IOV_VALUE(&iov[iovs++], ctx->restoreCtx.restoreSize + 10, lenStr); /* write restore len + trailer */ IOV_STRING(&iov[iovs++], frag, fragLen); /* write first frag */ - return writevWrap(ctx, iov, iovs, 1, 0); + return writevWrap(ctx, iov, iovs, &startCmd, 0); } static inline RdbRes sendFirstRestoreFragModuleAux(RdbxToResp *ctx, RdbBulk frag, size_t fragLen) { struct iovec iov[3]; char lenStr[32]; + + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "RESTOREMODAUX"; + startCmd.key = ""; + + /* writev RESTOREMODAUX */ iov[0].iov_base = ctx->restoreCtx.moduleAux.cmdPrefix; iov[0].iov_len = ctx->restoreCtx.moduleAux.cmdlen; IOV_LENGTH(&iov[1], ctx->restoreCtx.restoreSize + 10, lenStr); /* write restore len + trailer */ IOV_STRING(&iov[2], frag, fragLen); /* write first frag */ - return writevWrap(ctx, iov, 3, 1, 0); + return writevWrap(ctx, iov, 3, &startCmd, 0); } /*** Handling common ***/ @@ -327,14 +342,17 @@ static RdbRes toRespNewDb(RdbParser *p, void *userData, int dbid) { char dbidStr[10], cntStr[10]; RdbxToResp *ctx = userData; - int cnt = ll2string(dbidStr, sizeof(dbidStr), dbid); + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "SELECT"; + startCmd.key = ""; + IOV_CONST(&iov[0], "*2\r\n$6\r\nSELECT"); IOV_LENGTH(&iov[1], cnt, cntStr); IOV_STRING(&iov[2], dbidStr, cnt); IOV_CONST(&iov[3], "\r\n"); - return writevWrap(ctx, iov, 4, 1, 1); + return writevWrap(ctx, iov, 4, &startCmd, 1); } static RdbRes toRespStartRdb(RdbParser *p, void *userData, int rdbVersion) { @@ -365,11 +383,16 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo if ((ctx->keyCtx.delBeforeWrite == DEL_KEY_BEFORE_BY_DEL_CMD) && (info->opcode != _RDB_TYPE_STRING)) { struct iovec iov[4]; char keyLenStr[32]; + + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "DEL"; + startCmd.key = ctx->keyCtx.key; + IOV_CONST(&iov[0], "*2\r\n$3\r\nDEL"); IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr); IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen); IOV_CONST(&iov[3], "\r\n"); - return writevWrap(ctx, iov, 4, 1, 1); + return writevWrap(ctx, iov, 4, &startCmd, 1); } return RDB_OK; } @@ -381,6 +404,10 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) { /* key is in db. Set its expiration time */ if (ctx->keyCtx.info.expiretime != -1) { struct iovec iov[6]; + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "PEXPIREAT"; + startCmd.key = ctx->keyCtx.key; + char keyLenStr[32], expireLenStr[32], expireStr[32]; /* PEXPIREAT */ IOV_CONST(&iov[0], "*3\r\n$9\r\nPEXPIREAT"); @@ -390,7 +417,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) { IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen); IOV_LEN_AND_VAL(iov+3, ctx->keyCtx.info.expiretime, expireLenStr, expireStr); - return writevWrap(ctx, iov, 5, 1, 1); + return writevWrap(ctx, iov, 5, &startCmd, 1); } RDB_bulkCopyFree(p, ctx->keyCtx.key); @@ -410,6 +437,11 @@ static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) { /*** fillup iovec ***/ struct iovec iov[7]; + + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "SET"; + startCmd.key = ctx->keyCtx.key; + /* write SET */ IOV_CONST(&iov[0], "*3\r\n$3\r\nSET"); /* write key */ @@ -419,18 +451,22 @@ static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) { IOV_LENGTH(&iov[3], valLen, valLenStr); IOV_STRING(&iov[4], string, valLen); IOV_CONST(&iov[5], "\r\n"); - return writevWrap(ctx, iov, 6, 1, 1); + return writevWrap(ctx, iov, 6, &startCmd, 1); } static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) { RdbxToResp *ctx = userData; + struct iovec iov[7]; /*** fillup iovec ***/ char keyLenStr[32], valLenStr[32]; int valLen = RDB_bulkLen(p, item); - struct iovec iov[7]; + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "RPUSH"; + startCmd.key = ctx->keyCtx.key; + /* write RPUSH */ IOV_CONST(&iov[0], "*3\r\n$5\r\nRPUSH"); /* write key */ @@ -440,10 +476,11 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) { IOV_LENGTH(&iov[3], valLen, valLenStr); IOV_STRING(&iov[4], item, valLen); IOV_CONST(&iov[5], "\r\n"); - return writevWrap(ctx, iov, 6, 1, 1); + return writevWrap(ctx, iov, 6, &startCmd, 1); } static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk value) { + struct iovec iov[10]; RdbxToResp *ctx = userData; /*** fillup iovec ***/ @@ -452,7 +489,10 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va int fieldLen = RDB_bulkLen(p, field); int valueLen = RDB_bulkLen(p, value); - struct iovec iov[10]; + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "HSET"; + startCmd.key = ctx->keyCtx.key; + /* write RPUSH */ IOV_CONST(&iov[0], "*4\r\n$4\r\nHSET"); /* write key */ @@ -465,16 +505,20 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va IOV_LENGTH(&iov[5], valueLen, valueLenStr); IOV_STRING(&iov[6], value, valueLen); IOV_CONST(&iov[7], "\r\n"); - return writevWrap(ctx, iov, 8, 1, 1); + return writevWrap(ctx, iov, 8, &startCmd, 1); } static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) { + struct iovec iov[7]; RdbxToResp *ctx = userData; char keyLenStr[32], valLenStr[32]; int valLen = RDB_bulkLen(p, member); - struct iovec iov[7]; + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "SADD"; + startCmd.key = ctx->keyCtx.key; + /* write RPUSH */ IOV_CONST(&iov[0], "*3\r\n$4\r\nSADD"); /* write key */ @@ -484,16 +528,20 @@ static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) { IOV_LENGTH(&iov[3], valLen, valLenStr); IOV_STRING(&iov[4], member, valLen); IOV_CONST(&iov[5], "\r\n"); - return writevWrap(ctx, iov, 6, 1, 1); + return writevWrap(ctx, iov, 6, &startCmd, 1); } static RdbRes toRespZset(RdbParser *p, void *userData, RdbBulk member, double score) { + struct iovec iov[10]; RdbxToResp *ctx = userData; char keyLenStr[32], valLenStr[32], scoreLenStr[32]; int valLen = RDB_bulkLen(p, member); - struct iovec iov[10]; + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "ZADD"; + startCmd.key = ctx->keyCtx.key; + /* write ZADD */ IOV_CONST(&iov[0], "*4\r\n$4\r\nZADD"); /* write key */ @@ -511,7 +559,7 @@ static RdbRes toRespZset(RdbParser *p, void *userData, RdbBulk member, double sc IOV_LENGTH(&iov[5], valLen, valLenStr); IOV_STRING(&iov[6], member, valLen); IOV_CONST(&iov[7], "\r\n"); - return writevWrap(ctx, iov, 8, 1, 1); + return writevWrap(ctx, iov, 8, &startCmd, 1); } static RdbRes toRespEndRdb(RdbParser *p, void *userData) { @@ -535,13 +583,17 @@ static RdbRes toRespFunction(RdbParser *p, void *userData, RdbBulk func) { int funcLen = RDB_bulkLen(p, func); + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "FUNCTION"; + startCmd.key = ""; + struct iovec iov[4]; IOV_CONST(&iov[0], "*4\r\n$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n$7\r\nREPLACE"); /* write member */ IOV_LENGTH(&iov[1], funcLen, funcLenStr); IOV_STRING(&iov[2], func, funcLen); IOV_CONST(&iov[3], "\r\n"); - return writevWrap( (RdbxToResp *) userData, iov, 4, 1, 1); + return writevWrap( (RdbxToResp *) userData, iov, 4, &startCmd, 1); } @@ -558,17 +610,25 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta * * for the Stream type. (We don't use the MAXLEN 0 trick from aof.c * because of Redis Enterprise CRDT compatibility issues - Can't XSETID "back") */ + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "XGROUP CREATE"; + startCmd.key = ctx->keyCtx.key; + IOV_CONST(&iov[0], "*6\r\n$6\r\nXGROUP\r\n$6\r\nCREATE"); IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr); IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen); IOV_CONST(&iov[3], "$7\r\ndummyCG\r\n$1\r\n$\r\n$8\r\nMKSTREAM\r\n"); - IF_NOT_OK_RETURN(writevWrap( (RdbxToResp *) userData, iov, 4, 1, 1)); + IF_NOT_OK_RETURN(writevWrap( (RdbxToResp *) userData, iov, 4, &startCmd, 1)); + + /* another startCmd */ + startCmd.cmd = "XGROUP DESTROY"; + startCmd.key = ctx->keyCtx.key; IOV_CONST(&iov[0], "*4\r\n$6\r\nXGROUP\r\n$7\r\nDESTROY"); IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr); IOV_STRING(&iov[2], ctx->keyCtx.key, ctx->keyCtx.keyLen); IOV_CONST(&iov[3], "$7\r\ndummyCG\r\n"); - IF_NOT_OK_RETURN(writevWrap( (RdbxToResp *) userData, iov, 4, 1, 1)); + IF_NOT_OK_RETURN(writevWrap( (RdbxToResp *) userData, iov, 4, &startCmd, 1)); } /* take care to reset it for next stream-item */ @@ -577,6 +637,10 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta * int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastID.ms,meta->lastID.seq); int maxDelEntryIdLen = snprintf(maxDelEntryId, sizeof(maxDelEntryId), "%lu-%lu", meta->maxDelEntryID.ms, meta->maxDelEntryID.seq); + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "XSETID"; + startCmd.key = ctx->keyCtx.key; + if ((ctx->keyCtx.info.opcode >= _RDB_TYPE_STREAM_LISTPACKS_2) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) { IOV_CONST(&iov[0], "*7\r\n$6\r\nXSETID"); IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr); @@ -589,7 +653,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta * IOV_LENGTH(&iov[9], maxDelEntryIdLen, maxDelEntryIdLenStr); IOV_STRING(&iov[10], maxDelEntryId, maxDelEntryIdLen); IOV_CONST(&iov[11], "\r\n"); - return writevWrap( (RdbxToResp *) userData, iov, 12, 1, 1); + return writevWrap( (RdbxToResp *) userData, iov, 12, &startCmd, 1); } else { IOV_CONST(&iov[0], "*3\r\n$6\r\nXSETID"); IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr); @@ -597,13 +661,14 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta * IOV_LENGTH(&iov[3], idLen, idLenStr); IOV_STRING(&iov[4], idStr, idLen); IOV_CONST(&iov[5], "\r\n"); - return writevWrap( (RdbxToResp *) userData, iov, 6, 1, 1); + return writevWrap( (RdbxToResp *) userData, iov, 6, &startCmd, 1); } } static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, RdbBulk field, RdbBulk val, int64_t itemsLeft) { char cmd[64], idStr[100], idLenStr[64], keyLenStr[32], fieldLenStr[32], valLenStr[32]; - int iovs = 0, startCmd = 0 , endCmd = 0; + int iovs = 0, endCmd = 0; + RdbxRespWriterStartCmd startCmd, *startCmdRef = NULL; struct iovec iov[15]; RdbxToResp *ctx = userData; @@ -612,6 +677,11 @@ static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd /* Start of (another) stream item? */ if ((ctx->streamCtx.xaddStartEndCounter % 2) == 0) { + startCmd.cmd = "XADD"; + startCmd.key = ctx->keyCtx.key; + startCmdRef = &startCmd; + + /* writev XADD */ int cmdLen = snprintf(cmd, sizeof(cmd), "*%lu\r\n$4\r\nXADD", 3 + (itemsLeft + 1) * 2); IOV_STRING(&iov[iovs++], cmd, cmdLen); IOV_LENGTH(&iov[iovs++], ctx->keyCtx.keyLen, keyLenStr); @@ -620,7 +690,6 @@ static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd IOV_LENGTH(&iov[iovs++], idLen, idLenStr); IOV_STRING(&iov[iovs++], idStr, idLen); - startCmd = 1; ++ctx->streamCtx.xaddStartEndCounter; } @@ -636,7 +705,7 @@ static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd ++ctx->streamCtx.xaddStartEndCounter; } - return writevWrap( (RdbxToResp *) userData, iov, iovs, startCmd, endCmd); + return writevWrap( (RdbxToResp *) userData, iov, iovs, startCmdRef, endCmd); } /* Emit the XGROUP CREATE in order to create the group. */ @@ -660,6 +729,11 @@ static RdbRes toRespStreamNewCGroup(RdbParser *p, void *userData, RdbBulk grpNam int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastId.ms,meta->lastId.seq); + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "XGROUP"; + startCmd.key = ctx->keyCtx.key; + + /* writev XGROUP */ if ( (meta->entriesRead>=0) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) { /* XGROUP CREATE */ IOV_CONST(&iov[iovs++], "*7\r\n$6\r\nXGROUP\r\n$6\r\nCREATE"); @@ -690,7 +764,7 @@ static RdbRes toRespStreamNewCGroup(RdbParser *p, void *userData, RdbBulk grpNam IOV_STRING(&iov[iovs++], idStr, idLen); IOV_CONST(&iov[iovs++], "\r\n"); } - return writevWrap(ctx, iov, iovs, 1, 1); + return writevWrap(ctx, iov, iovs, &startCmd, 1); } static RdbRes toRespStreamCGroupPendingEntry(RdbParser *p, void *userData, RdbStreamPendingEntry *pendingEntry) { @@ -737,7 +811,11 @@ static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, Rdb return (RdbRes) RDBX_ERR_STREAM_INTEG_CHECK; } - /* XCLAIM */ + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "XCLAIM"; + startCmd.key = ctx->keyCtx.key; + + /* writev XCLAIM */ IOV_CONST(&iov[iovs++], "*12\r\n$6\r\nXCLAIM"); /* key */ IOV_LENGTH(&iov[iovs++], ctx->keyCtx.keyLen, keyLenStr); @@ -758,7 +836,7 @@ static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, Rdb idLen, idStr, sentTimeLen, sentTime, sentCountLen, sentCount); /* max: 2 + 2 + 1 + 3 + 21*2+1 + 2 + 4 + 3 + 21 + 2 + 10 + 3 +21 +2 + 6 + 2 +5 + 2*16 */ IOV_STRING(&iov[iovs++], cmdTrailer, cmdTrailerLen); - return writevWrap(ctx, iov, iovs, 1, 1); + return writevWrap(ctx, iov, iovs, &startCmd, 1); } /*** Handling raw (RESTORE) ***/ @@ -834,7 +912,7 @@ static RdbRes toRespRestoreFrag(RdbParser *p, void *userData, RdbBulk frag) { } IOV_STRING(&iov[iovs++], frag, fragLen); - return writevWrap(ctx, iov, iovs, 1, 0); + return writevWrap(ctx, iov, iovs, NULL, 0); } /* This call will be followed one or more calls to toRespRestoreFrag() which indicates @@ -900,7 +978,7 @@ static RdbRes toRespRestoreFragEnd(RdbParser *p, void *userData) { } struct iovec iov = {cmd, len}; - return writevWrap(ctx, &iov, 1, 0, 1); + return writevWrap(ctx, &iov, 1, NULL, 1); } /*** LIB API functions ***/ diff --git a/src/ext/respToFileWriter.c b/src/ext/respToFileWriter.c index 86c8572..be65656 100644 --- a/src/ext/respToFileWriter.c +++ b/src/ext/respToFileWriter.c @@ -10,7 +10,9 @@ struct RdbxRespToFileWriter { }; /* return 0 for success. 1 Otherwise. */ -static int respFileWritev(void *context, struct iovec *iov, int count, int startCmd, int endCmd) { +static int respFileWritev(void *context, struct iovec *iov, int count, + RdbxRespWriterStartCmd *startCmd, int endCmd) +{ UNUSED(startCmd); struct RdbxRespToFileWriter *ctx = context; ctx->cmdCount += endCmd; diff --git a/src/ext/respToRedisLoader.c b/src/ext/respToRedisLoader.c index b242b44..410091d 100644 --- a/src/ext/respToRedisLoader.c +++ b/src/ext/respToRedisLoader.c @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -17,7 +18,7 @@ #define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */ #define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */ -#define RECORDED_DATA_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */ +#define RECORDED_KEY_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */ #define REPLY_BUFF_SIZE 1024 /* reply buffer size */ @@ -29,40 +30,28 @@ struct RdbxRespToRedisLoader { struct { int num; int pipelineDepth; - char cmdPrefix[NUM_RECORDED_CMDS][RECORDED_DATA_MAX_LEN]; + /* pointers to (static) strings that hold the template of the command sent (no char* allocation required) */ + const char *cmd[NUM_RECORDED_CMDS]; + /* strncpy() of the key sent */ + char key[NUM_RECORDED_CMDS][RECORDED_KEY_MAX_LEN]; } pendingCmds; RespReaderCtx respReader; RdbParser *p; int fd; + int fdOwner; /* Set to 1 if this entity created the socket, and it is the one to release. */ }; static void onReadRepliesError(RdbxRespToRedisLoader *ctx) { RespReaderCtx *respReader = &ctx->respReader; int currIdx = ctx->respReader.countReplies % NUM_RECORDED_CMDS; - char *currCmdRecord = ctx->pendingCmds.cmdPrefix[currIdx]; - - /* Print also previous command if available. */ - if (ctx->respReader.countReplies > 1) { - int prevIdx = (currIdx == 0) ? NUM_RECORDED_CMDS - 1 : currIdx - 1; - char *prevCmdRecord = ctx->pendingCmds.cmdPrefix[prevIdx]; - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP_WRITE, - "\nReceived Server error: \"%s\"\nGot failed on command [#%d] (First %d bytes):\n%s\n" - "\nPreceding command [#%d] was: \n%s\n", - respReader->errorMsg, - ctx->respReader.countReplies, - RECORDED_DATA_MAX_LEN, - currCmdRecord, - ctx->respReader.countReplies-1, - prevCmdRecord); - } else { - RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP_WRITE, - "\nReceived Server error:\n\"%s\"\n\nGot failed on command [#%d] (First %d bytes):\n%s\n", - respReader->errorMsg, - ctx->respReader.countReplies, - RECORDED_DATA_MAX_LEN, - currCmdRecord); - } + + RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP_WRITE, + "\nerror from dst '-%s' on key '%s' on command '%s' (RESP Command #%zu)\n", + respReader->errorMsg, + ctx->pendingCmds.key[currIdx], + ctx->pendingCmds.cmd[currIdx], + ctx->respReader.countReplies); } /* Read 'numToRead' replies from the socket. * Return 0 for success, 1 otherwise. */ @@ -97,28 +86,20 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) { } /* For debugging, record the command into the cyclic array before sending it */ -static inline void recordNewCmd(RdbxRespToRedisLoader *ctx, const struct iovec *cmd_iov, int iovcnt) { +static inline void recordCommandSent(RdbxRespToRedisLoader *ctx,RdbxRespWriterStartCmd *cmd) { int recordCmdEntry = (ctx->respReader.countReplies + ctx->pendingCmds.num) % NUM_RECORDED_CMDS; - char *recordCmdPrefixAt = ctx->pendingCmds.cmdPrefix[recordCmdEntry]; - - int copiedBytes = 0, bytesToCopy = RECORDED_DATA_MAX_LEN - 1; - - const struct iovec* currentIov = cmd_iov; - for (int i = 0; i < iovcnt && bytesToCopy; ++i) { - int slice = (currentIov->iov_len >= ((size_t)bytesToCopy)) ? bytesToCopy : (int) currentIov->iov_len; - for (int j = 0 ; j < slice ; ) - recordCmdPrefixAt[copiedBytes++] = ((char *)currentIov->iov_base)[j++]; - - bytesToCopy -= slice; - ++currentIov; - } - recordCmdPrefixAt[copiedBytes] = '\0'; + /* no need to copy the cmd. handlersToResp took care to pass a string that is persistent and constant */ + ctx->pendingCmds.cmd[recordCmdEntry] = cmd->cmd; + strncpy(ctx->pendingCmds.key[recordCmdEntry], cmd->key, RECORDED_KEY_MAX_LEN-1); + ctx->pendingCmds.key[recordCmdEntry][RECORDED_KEY_MAX_LEN-1] = '\0'; } /* Write the vector of data to the socket with writev() sys-call. * Return 0 for success, 1 otherwise. */ -static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, int startCmd, int endCmd) { +static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, + RdbxRespWriterStartCmd *startCmd, int endCmd) +{ ssize_t writeResult; int retries = 0; @@ -129,8 +110,7 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, int s return 1; } - if (startCmd) - recordNewCmd(ctx, iov, iovCnt); + if (startCmd) recordCommandSent(ctx, startCmd); while (1) { @@ -188,7 +168,9 @@ static void redisLoaderDelete(void *context) { /* not required to flush on termination */ shutdown(ctx->fd, SHUT_WR); /* graceful shutdown */ - close(ctx->fd); + + if (ctx->fdOwner) close(ctx->fd); + RDB_free(ctx->p, ctx); } @@ -201,6 +183,10 @@ static RdbRes redisAuthCustomized(RdbxRespToRedisLoader *ctx, RdbxRedisAuth *aut char prefix[32]; + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = ""; + startCmd.key = ""; + /* allocate iovec (2 for header and trailer. 3 for each argument) */ struct iovec *iov = (struct iovec *)malloc((auth->cmd.argc * 3 + 2) * sizeof(struct iovec)); /* allocate temporary buffer to assist converting length to string of all args */ @@ -225,7 +211,7 @@ static RdbRes redisAuthCustomized(RdbxRespToRedisLoader *ctx, RdbxRedisAuth *aut IOV_STRING(&iov[iovs++], auth->cmd.argv[i], tLen); } IOV_CONST(&iov[iovs++], "\r\n"); - redisLoaderWritev(ctx, iov, iovs, 1, 1); + redisLoaderWritev(ctx, iov, iovs, &startCmd, 1); AuthEnd: if (iov) free(iov); @@ -245,6 +231,11 @@ static RdbRes redisAuth(RdbxRespToRedisLoader *ctx, RdbxRedisAuth *auth) { return redisAuthCustomized(ctx, auth); /* AUTH [username] password */ + + RdbxRespWriterStartCmd startCmd; + startCmd.cmd = "AUTH"; + startCmd.key = ""; + struct iovec iov[10]; if (auth->user) { IOV_CONST(&iov[0], "*3\r\n$4\r\nauth\r\n$"); @@ -266,7 +257,7 @@ static RdbRes redisAuth(RdbxRespToRedisLoader *ctx, RdbxRedisAuth *auth) { iovs = 4; } - redisLoaderWritev(ctx, iov, iovs, 1, 1); + redisLoaderWritev(ctx, iov, iovs, &startCmd, 1); return RDB_OK; } @@ -282,7 +273,6 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p, int fd) { RdbxRespToRedisLoader *ctx; if ((ctx = RDB_alloc(p, sizeof(RdbxRespToRedisLoader))) == NULL) { - close(fd); RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC, "Failed to allocate struct RdbxRespToRedisLoader"); return NULL; @@ -292,6 +282,7 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p, memset(ctx, 0, sizeof(RdbxRespToRedisLoader)); ctx->p = p; ctx->fd = fd; + ctx->fdOwner = 0; ctx->pendingCmds.num = 0; ctx->pendingCmds.pipelineDepth = PIPELINE_DEPTH_DEF; readRespInit(&ctx->respReader); @@ -322,17 +313,26 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p, server_addr.sin_port = htons(port); if (inet_pton(AF_INET, hostname, &(server_addr.sin_addr)) <= 0) { RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_INVALID_ADDRESS, - "Invalid tcp address (hostname=%s, port=%d)", hostname, port); - close(sockfd); - return NULL; + "Failed to convert IP address. inet_pton(hostname=%s, port=%d) => errno=%d", + hostname, port, errno); + goto createErr; } if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1) { RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_INVALID_ADDRESS, - "Invalid tcp address (hostname=%s, port=%d)", hostname, port); - close(sockfd); - return NULL; + "Failed to connect(hostname=%s, port=%d) => errno=%d", + hostname, port, errno); + goto createErr; } - return RDBX_createRespToRedisFd(p, rdbToResp, auth, sockfd); + RdbxRespToRedisLoader *res = RDBX_createRespToRedisFd(p, rdbToResp, auth, sockfd); + + if (!res) goto createErr; + + res->fdOwner = 1; + return res; + +createErr: + close(sockfd); + return NULL; } diff --git a/src/lib/parser.c b/src/lib/parser.c index 71ad5f2..9bed8ba 100644 --- a/src/lib/parser.c +++ b/src/lib/parser.c @@ -164,6 +164,7 @@ static RdbHandlers *createHandlersCommon(RdbParser *p, void *userData, RdbFreeFu static void loggerCbDefault(RdbLogLevel l, const char *msg); static inline RdbStatus updateStateAfterParse(RdbParser *p, RdbStatus status); static void printParserState(RdbParser *p); +static uint64_t dummyCrcFunc(uint64_t crc, const unsigned char *s, uint64_t l); static inline void restoreEmbeddedBulk(RdbParser *p, EmbeddedBulk *embeddedBulk); static void *allocEmbeddedBulk(RdbParser *p, @@ -235,6 +236,7 @@ _LIBRDB_API RdbParser *RDB_createParserRdb(RdbMemAlloc *memAlloc) { p->currOpcode = UINT32_MAX; p->deepIntegCheck = 1; p->ignoreChecksum = 0; + p->crcFunc = crc64; /*** RDB_parseBuff related data ***/ p->isParseFromBuff = 0; @@ -392,6 +394,7 @@ _LIBRDB_API void RDB_setLogger(RdbParser *p, RdbLoggerCB f) { _LIBRDB_API void RDB_IgnoreChecksum(RdbParser *p) { p->ignoreChecksum = 1; + p->crcFunc = dummyCrcFunc; } _LIBRDB_API void RDB_setMaxRawSize(RdbParser *p, size_t size) { @@ -571,6 +574,11 @@ _LIBRDB_API int RDB_getMaxSuppportRdbVersion(void) { /*** various functions ***/ +static uint64_t dummyCrcFunc(uint64_t crc, const unsigned char *s, uint64_t l) { + UNUSED(crc, s, l); + return 0; +} + static const char *getStatusString(RdbStatus status) { switch ((int) status) { case RDB_STATUS_OK: return "OK"; @@ -728,8 +736,8 @@ static void chainHandlersAcrossLevels(RdbParser *p) { static void resolveMultipleLevelsRegistration(RdbParser *p) { /* find the lowest level that handlers are registered */ - int lvl = (p->numHandlers[0]) ? RDB_LEVEL_RAW : - (p->numHandlers[1]) ? RDB_LEVEL_STRUCT : + int lvl = (p->numHandlers[RDB_LEVEL_RAW]) ? RDB_LEVEL_RAW : + (p->numHandlers[RDB_LEVEL_STRUCT]) ? RDB_LEVEL_STRUCT : RDB_LEVEL_DATA ; for (int i = 0 ; i < RDB_OPCODE_MAX ; ++i) { @@ -739,6 +747,21 @@ static void resolveMultipleLevelsRegistration(RdbParser *p) { } } +static void attachDebugHandlers(RdbParser *p) { + RDB_setLogLevel(p, RDB_LOG_DBG); + /* find the lowest level that handlers are registered and register DBG handlers accordingly */ + if (p->numHandlers[RDB_LEVEL_RAW]) { + RdbHandlersRawCallbacks cb = {.handleNewKey = handleNewKeyPrintDbg, .handleEndKey = handleEndKeyPrintDbg}; + RDB_createHandlersRaw(p, &cb, NULL, NULL); + } else if (p->numHandlers[RDB_LEVEL_STRUCT]) { + RdbHandlersStructCallbacks cb = {.handleNewKey = handleNewKeyPrintDbg, .handleEndKey = handleEndKeyPrintDbg}; + RDB_createHandlersStruct(p, &cb, NULL, NULL); + } else { + RdbHandlersDataCallbacks cb = {.handleNewKey = handleNewKeyPrintDbg, .handleEndKey = handleEndKeyPrintDbg}; + RDB_createHandlersData(p, &cb, NULL, NULL); + } +} + static RdbStatus finalizeConfig(RdbParser *p, int isParseFromBuff) { assert(p->state == RDB_STATE_CONFIGURING); @@ -746,11 +769,9 @@ static RdbStatus finalizeConfig(RdbParser *p, int isParseFromBuff) { crc64_init_thread_safe(); - if ((p->debugData = getEnvVar(ENV_VAR_DEBUG_DATA, 0)) != 0) { - RDB_setLogLevel(p, RDB_LOG_DBG); - RdbHandlersDataCallbacks cb = {.handleNewKey = handleNewKeyPrintDbg, .handleEndKey = handleEndKeyPrintDbg}; - RDB_createHandlersData(p, &cb, NULL, NULL); - } + /* debug handlers to further printout processed data */ + if ((p->debugData = getEnvVar(ENV_VAR_DEBUG_DATA, 0)) != 0) + attachDebugHandlers(p); p->isParseFromBuff = isParseFromBuff; @@ -2621,7 +2642,7 @@ static RdbStatus readRdbFromReader(RdbParser *p, size_t len, AllocTypeRq type, c /* done read entirely. Eval crc of entire read */ (*binfo)->written = DONE_FILL_BULK; - p->checksum = crc64(p->checksum, (unsigned char *) (*binfo)->ref, len); + p->checksum = p->crcFunc(p->checksum, (unsigned char *) (*binfo)->ref, len); return res; } } else { @@ -2629,7 +2650,7 @@ static RdbStatus readRdbFromReader(RdbParser *p, size_t len, AllocTypeRq type, c /* Got last time WAIT_MORE_DATA. assumed async read filled it up */ /* After WAIT_MORE_DATA we cannot eval crc. Update it now. */ - p->checksum = crc64(p->checksum, (unsigned char *) (*binfo)->ref, len); + p->checksum = p->crcFunc(p->checksum, (unsigned char *) (*binfo)->ref, len); (*binfo)->written = DONE_FILL_BULK; } return RDB_STATUS_OK; @@ -2680,7 +2701,7 @@ static RdbStatus readRdbFromBuff(RdbParser *p, size_t len, AllocTypeRq type, cha memcpy(((char *) (*binfo)->ref) + (*binfo)->written, p->parsebuffCtx.at, leftToFillItem); - p->checksum = crc64(p->checksum, (*binfo)->ref, len); + p->checksum = p->crcFunc(p->checksum, (*binfo)->ref, len); p->parsebuffCtx.at += leftToFillItem; (*binfo)->written = DONE_FILL_BULK; diff --git a/src/lib/parser.h b/src/lib/parser.h index a0bf7dc..378f293 100644 --- a/src/lib/parser.h +++ b/src/lib/parser.h @@ -73,6 +73,8 @@ static inline void unused(void *dummy, ...) { (void)(dummy);} finalize_cmd; \ } while (0) +typedef uint64_t (*CrcFunc)(uint64_t, const unsigned char *, uint64_t); + typedef enum BulkType { BULK_TYPE_STACK, /* from stack bulk */ BULK_TYPE_HEAP, /* from heap bulk */ @@ -355,6 +357,7 @@ struct RdbParser { RdbMemAlloc mem; int deepIntegCheck; int ignoreChecksum; + CrcFunc crcFunc; RdbLoggerCB loggerCb; RdbLogLevel logLevel; size_t maxRawSize; diff --git a/test/dumps/invalid_chksum_v8.rdb b/test/dumps/invalid_chksum_v8.rdb new file mode 100644 index 0000000..db253ee Binary files /dev/null and b/test/dumps/invalid_chksum_v8.rdb differ diff --git a/test/test_main.c b/test/test_main.c index 37fa1ad..75966bf 100644 --- a/test/test_main.c +++ b/test/test_main.c @@ -81,6 +81,30 @@ static void test_mixed_levels_registration(void **state) { assert_json_equal(jsonfileData, DUMP_FOLDER("multiple_lists_strings_subset_list.json"), 1); } +static void test_checksum(void **state) { + RdbParser *parser; + RdbStatus status; + UNUSED(state); + const char *rdbfile = DUMP_FOLDER("invalid_chksum_v8.rdb"); + + /* fail on checksum error */ + parser = RDB_createParserRdb(NULL); + RDB_setLogLevel(parser, RDB_LOG_ERR); + assert_non_null(RDBX_createReaderFile(parser, rdbfile)); + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + assert_int_equal( status, RDB_STATUS_ERROR); + assert_int_equal(RDB_getErrorCode(parser), RDB_ERR_CHECKSUM_FAILURE); + RDB_deleteParser(parser); + + /* ignore checksum error */ + parser = RDB_createParserRdb(NULL); + assert_non_null(RDBX_createReaderFile(parser, rdbfile)); + RDB_IgnoreChecksum(parser); + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + assert_int_equal( status, RDB_STATUS_OK); + RDB_deleteParser(parser); +} + static void test_examples(void **state) { UNUSED(state); runSystemCmd("make example > /dev/null "); @@ -121,13 +145,14 @@ int group_examples(void) { return cmocka_run_group_tests(tests, NULL, NULL); } -/*************************** group_main *******************************/ -int group_main(void) { +/*************************** group_misc *******************************/ +int group_misc(void) { /* Insert here your test functions */ const struct CMUnitTest tests[] = { cmocka_unit_test(test_createReader_missingFile), cmocka_unit_test(test_empty_rdb), cmocka_unit_test(test_mixed_levels_registration), + cmocka_unit_test(test_checksum), }; return cmocka_run_group_tests(tests, NULL, NULL); } @@ -183,7 +208,7 @@ int main(int argc, char *argv[]) { RUN_TEST_GROUP(group_examples); RUN_TEST_GROUP(group_test_resp_reader); RUN_TEST_GROUP(group_rdb_to_resp); - RUN_TEST_GROUP(group_main); + RUN_TEST_GROUP(group_misc); RUN_TEST_GROUP(group_rdb_to_json); RUN_TEST_GROUP(group_mem_management); RUN_TEST_GROUP(group_bulk_ops); @@ -193,7 +218,7 @@ int main(int argc, char *argv[]) { printf("\n*************** SIMULATING WAIT_MORE_DATA *******************\n"); setEnvVar("LIBRDB_SIM_WAIT_MORE_DATA", "1"); - RUN_TEST_GROUP(group_main); + RUN_TEST_GROUP(group_misc); RUN_TEST_GROUP(group_rdb_to_resp); RUN_TEST_GROUP(group_rdb_to_json); RUN_TEST_GROUP(group_mem_management); diff --git a/test/test_rdb_cli.c b/test/test_rdb_cli.c index 611ddc5..52edd1a 100644 --- a/test/test_rdb_cli.c +++ b/test/test_rdb_cli.c @@ -130,6 +130,11 @@ static void test_rdb_cli_filter_mix(void **state) { runSystemCmd(" ./bin/rdb-cli ./test/dumps/multiple_lists_strings.rdb -t str -k string json -f | grep list3 && exit 1 || exit 0 > /dev/null "); } +static void test_rdb_cli_input_fd_reader(void **state) { + UNUSED(state); + runSystemCmd(" cat ./test/dumps/single_key.rdb | ./bin/rdb-cli - json | grep xxx > /dev/null "); +} + static void test_rdb_cli_redis_auth(void **state) { UNUSED(state); /* check password authentication */ @@ -175,6 +180,7 @@ int group_test_rdb_cli(void) { cmocka_unit_test_setup(test_rdb_cli_filter_invalid_input, setupTest), cmocka_unit_test_setup(test_rdb_cli_filter_type, setupTest), cmocka_unit_test_setup(test_rdb_cli_filter_mix, setupTest), + cmocka_unit_test_setup(test_rdb_cli_input_fd_reader, setupTest), cmocka_unit_test_setup(test_rdb_cli_redis_auth, setupTest), };