Skip to content

Commit

Permalink
Fix filedesc + Fix state transition of stream (#33)
Browse files Browse the repository at this point in the history
- Fix file descriptor reader. Was written naively without checking what happen if not all the data was read or for EINTR.
- Fix wrong fall-through in the states of stream element.
I will add tests on another commit.
  • Loading branch information
moticless authored Dec 4, 2023
1 parent 739bdfd commit 20f5f64
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 5 deletions.
11 changes: 11 additions & 0 deletions api/librdb-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ typedef enum RdbRes {
RDB_ERR_INVALID_CONFIGURATION,
RDB_ERR_FAILED_CREATE_PARSER,
RDB_ERR_FAILED_OPEN_LOG_FILE,
RDB_ERR_FAILED_GET_FD_FLAGS,
RDB_ERR_NONBLOCKING_FD,
RDB_ERR_NONBLOCKING_READ_FD,
RDB_ERR_FAILED_READ_RDB_FILE,
RDB_ERR_FAILED_OPEN_RDB_FILE,
RDB_ERR_WRONG_FILE_SIGNATURE,
Expand Down Expand Up @@ -351,6 +354,14 @@ _LIBRDB_API RdbStatus RDB_parseBuff(RdbParser *p,
*
* Used by: RDBX_createReaderFile
* <user-defined-reader>
*
* TODO: The parser only supports reading asynchrnously (non-blocking)
* through RDB_parseBuff() API. It is required to Extend parser for
* readers to support non-blocking mode as well. Currently the provided
* reader-function (RdbReaderFunc) can only:
* - Read the entire request and return RDB_STATUS_OK
* - Or, read none and return RDB_STATUS_WAIT_MORE_DATA
* - Or, return RDB_STATUS_ERROR
****************************************************************/
_LIBRDB_API RdbReader *RDB_createReaderRdb(RdbParser *p,
RdbReaderFunc r,
Expand Down
8 changes: 8 additions & 0 deletions api/librdb-ext-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ typedef enum {

/****************************************************************
* Create RDB Reader
*
* Creation of RDB reader based on filename or file-descriptor
*
* Note: File-descriptor must be set to blocking mode.
*
* TODO: The parser only supports reading asynchronously (non-blocking)
* through RDB_parseBuff() API. It is required to Extend parser for
* readers to support non-blocking mode as well.
****************************************************************/

_LIBRDB_API RdbxReaderFile *RDBX_createReaderFile(RdbParser *parser, const char *filename);
Expand Down
1 change: 1 addition & 0 deletions src/ext/readerFile.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ static void deleteReaderFile(RdbParser *p, void *rdata) {
RDB_free(p, readerData);
}

/* Attempts to read entire len, otherwise returns error */
static RdbStatus readFile(void *data, void *buf, size_t len) {
RdbxReaderFile *readerFile = data;
size_t readLen = fread(buf, sizeof(char), len, readerFile->file);
Expand Down
61 changes: 57 additions & 4 deletions src/ext/readerFileDesc.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <unistd.h>
#include "common.h"

Expand All @@ -18,19 +21,69 @@ static void deleteReaderFileDesc(RdbParser *p, void *rdata) {
RDB_free(p, readerData);
}

/* Attempts to read entire len, otherwise returns error */
static RdbStatus readFileDesc(void *data, void *buf, size_t len) {
RdbxReaderFileDesc *ctx = (RdbxReaderFileDesc *)data;
size_t totalBytesRead = 0;

RdbxReaderFileDesc *ctx = (RdbxReaderFileDesc *) data;
ssize_t bytesRead = read(ctx->fd, buf, len);
if (bytesRead == -1) {
RDB_reportError(ctx->parser, RDB_ERR_FAILED_READ_RDB_FILE, NULL);
while (totalBytesRead < len) {
ssize_t bytesRead = read(ctx->fd, (char *)buf + totalBytesRead, len - totalBytesRead);

/* read some data */
if (likely(bytesRead > 0)) {
totalBytesRead += bytesRead;
break;
}

/* didn't read any data. Stop. */
if (bytesRead == 0) {
break;
}

assert(bytesRead == -1);

/* If interrupted, retry read */
if (errno == EINTR)
continue;

/* Wrongly configured to nonblocking mode (Not supported at the moment) */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
RDB_reportError(ctx->parser, RDB_ERR_NONBLOCKING_READ_FD,
"readFileDesc(): Unexpected EAGAIN|EWOULDBLOCK. The fd must be set to blocking mode");
return RDB_STATUS_ERROR;
}

RDB_reportError(ctx->parser, RDB_ERR_FAILED_READ_RDB_FILE,
"readFileDesc(): Read failed with errno=%d", errno);
return RDB_STATUS_ERROR;
}

if (totalBytesRead < len) {
RDB_reportError(ctx->parser, RDB_ERR_FAILED_READ_RDB_FILE,
"readFileDesc(): Not all requested bytes were read");
return RDB_STATUS_ERROR;
}

return RDB_STATUS_OK;
}

RdbxReaderFileDesc *RDBX_createReaderFileDesc(RdbParser *p, int fd, int fdCloseWhenDone) {

int flags = fcntl(fd, F_GETFL);

if (flags==-1) {

RDB_reportError(p, RDB_ERR_FAILED_GET_FD_FLAGS,
"RDBX_createReaderFileDesc(): Error getting file descriptor flags");
return NULL;
}

if (flags & O_NONBLOCK) {
RDB_reportError(p, RDB_ERR_NONBLOCKING_FD,
"RDBX_createReaderFileDesc(): fd must be set to blocking mode");
return NULL;
}

RdbxReaderFileDesc *ctx = (RdbxReaderFileDesc *) RDB_alloc(p, sizeof(RdbxReaderFileDesc));
ctx->parser = p;
ctx->fd = fd;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/parserRaw.c
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ RdbStatus elementRawStreamLP(RdbParser *p) {
return RDB_STATUS_ERROR;
}
}
updateElementState(p, ST_LOAD_NEXT_LP_IS_MORE, 1); /* fall-thru */
return updateElementState(p, ST_LOAD_NEXT_LP_IS_MORE, 1);

case ST_LOAD_METADATA: {
uint64_t dummyVal;
Expand Down

0 comments on commit 20f5f64

Please sign in to comment.