Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable pipe direction for all send types #67

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.2.9007
Version: 1.3.2.9008
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# nanonext 1.3.2.9007 (development)
# nanonext 1.3.2.9008 (development)

#### New Features

* New interface to Pipes moves to using integer pipe IDs rather than Pipe (external pointer) objects:
+ `send_aio()` gains the argument 'pipe' which accepts an integer pipe ID for directed sends (currently only supported by Sockets using the 'poly' protocol).
+ `send()` and `send_aio()` gain the argument 'pipe' which accepts an integer pipe ID for directed sends (currently only supported by Sockets using the 'poly' protocol).
+ A 'recvAio' now records the integer pipe ID, where successful, at `$aio` upon resolution.
+ Pipe objects (of class 'nanoPipe') are obsoleted.
* Adds `monitor()` and `read_monitor()` for easy monitoring of connection changes (pipe additons and removals) at a Socket.
Expand Down
2 changes: 0 additions & 2 deletions R/aio.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
#' @param con a Socket, Context or Stream.
#' @param timeout [default NULL] integer value in milliseconds or NULL, which
#' applies a socket-specific default, usually the same as no timeout.
#' @param pipe [default 0L] only applicable to Sockets using the 'poly' protocol,
#' an integer pipe ID if directing the send via a specific pipe.
#'
#' @return A \sQuote{sendAio} (object of class \sQuote{sendAio}) (invisibly).
#'
Expand Down
6 changes: 4 additions & 2 deletions R/sendrecv.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#' FALSE to return immediately even if unsuccessful (e.g. if no connection is
#' available), or else an integer value specifying the maximum time to block
#' in milliseconds, after which the operation will time out.
#' @param pipe [default 0L] only applicable to Sockets using the 'poly'
#' protocol, an integer pipe ID if directing the send via a specific pipe.
#'
#' @return An integer exit code (zero on success).
#'
Expand Down Expand Up @@ -84,8 +86,8 @@
#'
#' @export
#'
send <- function(con, data, mode = c("serial", "raw"), block = NULL)
.Call(rnng_send, con, data, mode, block)
send <- function(con, data, mode = c("serial", "raw"), block = NULL, pipe = 0L)
.Call(rnng_send, con, data, mode, block, pipe)

#' Receive
#'
Expand Down
5 changes: 4 additions & 1 deletion man/send.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions man/send_aio.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,17 +440,18 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
if ((xc = nng_msg_alloc(&msg, 0)))
goto exitlevel1;

if (pipeid) {
nng_pipe p;
p.id = (uint32_t) pipeid;
nng_msg_set_pipe(msg, p);
}

if ((xc = nng_msg_append(msg, buf.buf, buf.cur)) ||
(xc = nng_aio_alloc(&saio->aio, saio_complete, saio))) {
nng_msg_free(msg);
goto exitlevel1;
}

if (pipeid) {
nng_pipe p;
p.id = (uint32_t) pipeid;
nng_msg_set_pipe(msg, p);
}
nng_aio_set_msg(saio->aio, msg);
nng_aio_set_timeout(saio->aio, dur);
sock ? nng_send_aio(*(nng_socket *) NANO_PTR(con), saio->aio) :
Expand Down
59 changes: 19 additions & 40 deletions src/comms.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,61 +305,33 @@ SEXP rnng_listener_close(SEXP listener) {

// send and recv ---------------------------------------------------------------

SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {
SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {

const int flags = block == R_NilValue ? NNG_DURATION_DEFAULT : TYPEOF(block) == LGLSXP ? 0 : nano_integer(block);
nano_buf buf;
int xc;
int sock, xc;

const SEXP ptrtag = NANO_TAG(con);
if (ptrtag == nano_SocketSymbol) {
if ((sock = ptrtag == nano_SocketSymbol) || ptrtag == nano_ContextSymbol) {

const int pipeid = nano_integer(pipe);
nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con));
nng_socket *sock = (nng_socket *) NANO_PTR(con);
nng_msg *msgp;

if (flags <= 0) {

xc = nng_send(*sock, buf.buf, buf.cur, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
NANO_FREE(buf);

} else {

nng_msg *msgp;
nng_aio *aiop;

if ((xc = nng_msg_alloc(&msgp, 0)))
goto exitlevel1;

if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) ||
(xc = nng_aio_alloc(&aiop, NULL, NULL))) {
nng_msg_free(msgp);
goto exitlevel1;
if (pipeid) {
nng_pipe p;
p.id = (uint32_t) pipeid;
nng_msg_set_pipe(msgp, p);
}

nng_aio_set_msg(aiop, msgp);
nng_aio_set_timeout(aiop, flags);
nng_send_aio(*sock, aiop);
NANO_FREE(buf);
nng_aio_wait(aiop);
if ((xc = nng_aio_result(aiop)))
nng_msg_free(nng_aio_get_msg(aiop));
nng_aio_free(aiop);

}

} else if (ptrtag == nano_ContextSymbol) {

nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con));
nng_ctx *ctxp = (nng_ctx *) NANO_PTR(con);
nng_msg *msgp;

if (flags <= 0) {

if ((xc = nng_msg_alloc(&msgp, 0)))
goto exitlevel1;

if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) ||
(xc = nng_ctx_sendmsg(*ctxp, msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK))) {
(xc = sock ? nng_sendmsg(*(nng_socket *) NANO_PTR(con), msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK) :
nng_ctx_sendmsg(*(nng_ctx *) NANO_PTR(con), msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK))) {
nng_msg_free(msgp);
goto exitlevel1;
}
Expand All @@ -373,6 +345,12 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {
if ((xc = nng_msg_alloc(&msgp, 0)))
goto exitlevel1;

if (pipeid) {
nng_pipe p;
p.id = (uint32_t) pipeid;
nng_msg_set_pipe(msgp, p);
}

if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) ||
(xc = nng_aio_alloc(&aiop, NULL, NULL))) {
nng_msg_free(msgp);
Expand All @@ -381,7 +359,8 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {

nng_aio_set_msg(aiop, msgp);
nng_aio_set_timeout(aiop, flags);
nng_ctx_send(*ctxp, aiop);
sock ? nng_send_aio(*(nng_socket *) NANO_PTR(con), aiop) :
nng_ctx_send(*(nng_ctx *) NANO_PTR(con), aiop);
NANO_FREE(buf);
nng_aio_wait(aiop);
if ((xc = nng_aio_result(aiop)))
Expand Down
2 changes: 1 addition & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_recv", (DL_FUNC) &rnng_recv, 4},
{"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6},
{"rnng_request", (DL_FUNC) &rnng_request, 7},
{"rnng_send", (DL_FUNC) &rnng_send, 4},
{"rnng_send", (DL_FUNC) &rnng_send, 5},
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 6},
{"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 4},
{"rnng_set_marker", (DL_FUNC) &rnng_set_marker, 0},
Expand Down
2 changes: 1 addition & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ SEXP rnng_reap(SEXP);
SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP);
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP);
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_serial_config(SEXP, SEXP, SEXP, SEXP);
SEXP rnng_set_marker(void);
Expand Down
11 changes: 10 additions & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -364,17 +364,26 @@ test_true(!wait(cv))
test_true(!wait(cv2))
test_class("errorValue", resp$recv())

test_zero(cv_reset(cv))
test_class("nanoSocket", poly <- socket(protocol = "poly"))
test_class("nanoSocket", poly1 <- socket(protocol = "poly"))
test_class("nanoSocket", poly2 <- socket(protocol = "poly"))
test_class("nanoMonitor", m <- monitor(poly, cv))
test_print(m)
test_zero(listen(poly))
test_null(read_monitor(m))
test_zero(dial(poly1))
test_zero(dial(poly2))
test_true(wait(cv))
test_true(wait(cv))
test_equal(length(pipes <- read_monitor(m)), 2L)
test_zero(send_aio(poly, "one", timeout = 500, pipe = pipes[1L])[])
test_zero(send(poly, "two", block = 500, pipe = pipes[2L]))
test_type("character", recv(poly1, block = 500))
test_type("character", recv(poly2, block = 500))
test_zero(reap(poly2))
test_zero(reap(poly1))
test_true(!wait(cv))
test_true(wait(cv))
test_type("integer", read_monitor(m))
test_zero(reap(poly))

Expand Down
Loading