diff --git a/api/librdb-api.h b/api/librdb-api.h index 5fb910d..cccd411 100644 --- a/api/librdb-api.h +++ b/api/librdb-api.h @@ -48,6 +48,9 @@ typedef enum RdbRes { RDB_ERR_INVALID_CONFIGURATION, RDB_ERR_FAILED_CREATE_PARSER, RDB_ERR_FAILED_OPEN_LOG_FILE, + RDB_ERR_FAILED_GET_FD_FLAGS, + RDB_ERR_NONBLOCKING_FD, + RDB_ERR_NONBLOCKING_READ_FD, RDB_ERR_FAILED_READ_RDB_FILE, RDB_ERR_FAILED_OPEN_RDB_FILE, RDB_ERR_WRONG_FILE_SIGNATURE, @@ -351,6 +354,14 @@ _LIBRDB_API RdbStatus RDB_parseBuff(RdbParser *p, * * Used by: RDBX_createReaderFile * + * + * TODO: The parser only supports reading asynchrnously (non-blocking) + * through RDB_parseBuff() API. It is required to Extend parser for + * readers to support non-blocking mode as well. Currently the provided + * reader-function (RdbReaderFunc) can only: + * - Read the entire request and return RDB_STATUS_OK + * - Or, read none and return RDB_STATUS_WAIT_MORE_DATA + * - Or, return RDB_STATUS_ERROR ****************************************************************/ _LIBRDB_API RdbReader *RDB_createReaderRdb(RdbParser *p, RdbReaderFunc r, diff --git a/api/librdb-ext-api.h b/api/librdb-ext-api.h index b87c5d8..3b872f7 100644 --- a/api/librdb-ext-api.h +++ b/api/librdb-ext-api.h @@ -54,6 +54,14 @@ typedef enum { /**************************************************************** * Create RDB Reader + * + * Creation of RDB reader based on filename or file-descriptor + * + * Note: File-descriptor must be set to blocking mode. + * + * TODO: The parser only supports reading asynchronously (non-blocking) + * through RDB_parseBuff() API. It is required to Extend parser for + * readers to support non-blocking mode as well. ****************************************************************/ _LIBRDB_API RdbxReaderFile *RDBX_createReaderFile(RdbParser *parser, const char *filename); diff --git a/src/ext/readerFile.c b/src/ext/readerFile.c index 443f22c..2c97383 100644 --- a/src/ext/readerFile.c +++ b/src/ext/readerFile.c @@ -24,6 +24,7 @@ static void deleteReaderFile(RdbParser *p, void *rdata) { RDB_free(p, readerData); } +/* Attempts to read entire len, otherwise returns error */ static RdbStatus readFile(void *data, void *buf, size_t len) { RdbxReaderFile *readerFile = data; size_t readLen = fread(buf, sizeof(char), len, readerFile->file); diff --git a/src/ext/readerFileDesc.c b/src/ext/readerFileDesc.c index 841169b..f31be77 100644 --- a/src/ext/readerFileDesc.c +++ b/src/ext/readerFileDesc.c @@ -1,5 +1,8 @@ #include +#include #include +#include +#include #include #include "common.h" @@ -18,12 +21,46 @@ static void deleteReaderFileDesc(RdbParser *p, void *rdata) { RDB_free(p, readerData); } +/* Attempts to read entire len, otherwise returns error */ static RdbStatus readFileDesc(void *data, void *buf, size_t len) { + RdbxReaderFileDesc *ctx = (RdbxReaderFileDesc *)data; + size_t totalBytesRead = 0; - RdbxReaderFileDesc *ctx = (RdbxReaderFileDesc *) data; - ssize_t bytesRead = read(ctx->fd, buf, len); - if (bytesRead == -1) { - RDB_reportError(ctx->parser, RDB_ERR_FAILED_READ_RDB_FILE, NULL); + while (totalBytesRead < len) { + ssize_t bytesRead = read(ctx->fd, (char *)buf + totalBytesRead, len - totalBytesRead); + + /* read some data */ + if (likely(bytesRead > 0)) { + totalBytesRead += bytesRead; + break; + } + + /* didn't read any data. Stop. */ + if (bytesRead == 0) { + break; + } + + assert(bytesRead == -1); + + /* If interrupted, retry read */ + if (errno == EINTR) + continue; + + /* Wrongly configured to nonblocking mode (Not supported at the moment) */ + if (errno == EAGAIN || errno == EWOULDBLOCK) { + RDB_reportError(ctx->parser, RDB_ERR_NONBLOCKING_READ_FD, + "readFileDesc(): Unexpected EAGAIN|EWOULDBLOCK. The fd must be set to blocking mode"); + return RDB_STATUS_ERROR; + } + + RDB_reportError(ctx->parser, RDB_ERR_FAILED_READ_RDB_FILE, + "readFileDesc(): Read failed with errno=%d", errno); + return RDB_STATUS_ERROR; + } + + if (totalBytesRead < len) { + RDB_reportError(ctx->parser, RDB_ERR_FAILED_READ_RDB_FILE, + "readFileDesc(): Not all requested bytes were read"); return RDB_STATUS_ERROR; } @@ -31,6 +68,22 @@ static RdbStatus readFileDesc(void *data, void *buf, size_t len) { } RdbxReaderFileDesc *RDBX_createReaderFileDesc(RdbParser *p, int fd, int fdCloseWhenDone) { + + int flags = fcntl(fd, F_GETFL); + + if (flags==-1) { + + RDB_reportError(p, RDB_ERR_FAILED_GET_FD_FLAGS, + "RDBX_createReaderFileDesc(): Error getting file descriptor flags"); + return NULL; + } + + if (flags & O_NONBLOCK) { + RDB_reportError(p, RDB_ERR_NONBLOCKING_FD, + "RDBX_createReaderFileDesc(): fd must be set to blocking mode"); + return NULL; + } + RdbxReaderFileDesc *ctx = (RdbxReaderFileDesc *) RDB_alloc(p, sizeof(RdbxReaderFileDesc)); ctx->parser = p; ctx->fd = fd; diff --git a/src/lib/parserRaw.c b/src/lib/parserRaw.c index 31d00fa..e2b59c7 100644 --- a/src/lib/parserRaw.c +++ b/src/lib/parserRaw.c @@ -845,7 +845,7 @@ RdbStatus elementRawStreamLP(RdbParser *p) { return RDB_STATUS_ERROR; } } - updateElementState(p, ST_LOAD_NEXT_LP_IS_MORE, 1); /* fall-thru */ + return updateElementState(p, ST_LOAD_NEXT_LP_IS_MORE, 1); case ST_LOAD_METADATA: { uint64_t dummyVal;