Skip to content

Commit

Permalink
monitor concept
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 29, 2024
1 parent a235c88 commit f639cfa
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 5 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.2.9006
Version: 1.3.2.9007
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export(listen)
export(lock)
export(mclock)
export(messenger)
export(monitor)
export(msleep)
export(nano)
export(ncurl)
Expand All @@ -81,6 +82,7 @@ export(opt)
export(parse_url)
export(pipe_notify)
export(random)
export(read_monitor)
export(reap)
export(recv)
export(recv_aio)
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# nanonext 1.3.2.9006 (development)
# nanonext 1.3.2.9007 (development)

#### New Features

* New interface to Pipes moves to using integer pipe IDs rather than Pipe (external pointer) objects:
+ `send_aio()` gains the argument 'pipe' which accepts an integer pipe ID for directed sends (currently only supported by Sockets using the 'poly' protocol).
+ A 'recvAio' now records the integer pipe ID, where successful, at `$aio` upon resolution.
+ Pipe objects (of class 'nanoPipe') are obsoleted.
* Adds `monitor()` and `read_monitor()` for easy monitoring of connection changes (pipe additons and removals) at a Socket.

#### Updates

Expand Down
42 changes: 42 additions & 0 deletions R/socket.R
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,45 @@ close.nanoSocket <- function(con, ...) invisible(.Call(rnng_close, con))
#' @export
#'
reap <- function(con) .Call(rnng_reap, con)

#' Monitor a Socket for Pipe Changes
#'
#' This function monitors pipe additions and removals from a socket.
#'
#' @param sock a Socket.
#' @param cv a conditionVariable.
#'
#' @return For monitor: an external pointer. \cr
#' For read_monitor: an integer vector of pipe IDs (positive if added,
#' negative if removed), or else integer zero if there were no changes since
#' the previous read.
#'
#' @examples
#' cv <- cv()
#' s <- socket("poly")
#' s1 <- socket("poly")
#'
#' m <- monitor(s, cv)
#' read_monitor(m)
#'
#' listen(s)
#' dial(s1)
#'
#' cv_value(cv)
#' read_monitor(m)
#'
#' close(s)
#' close(s1)
#'
#' read_monitor(m)
#'
#' @export
#'
monitor <- function(sock, cv) .Call(rnng_monitor_create, sock, cv)

#' @param x an external pointer to a monitor.
#'
#' @rdname monitor
#' @export
#'
read_monitor <- function(x) .Call(rnng_monitor_read, x)
47 changes: 47 additions & 0 deletions man/monitor.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ SEXP nano_DotcallSymbol;
SEXP nano_HeadersSymbol;
SEXP nano_IdSymbol;
SEXP nano_ListenerSymbol;
SEXP nano_PipeSymbol;
SEXP nano_MonitorSymbol;
SEXP nano_ProtocolSymbol;
SEXP nano_ResolveSymbol;
SEXP nano_ResponseSymbol;
Expand Down Expand Up @@ -72,7 +72,7 @@ static void RegisterSymbols(void) {
nano_HeadersSymbol = Rf_install("headers");
nano_IdSymbol = Rf_install("id");
nano_ListenerSymbol = Rf_install("listener");
nano_PipeSymbol = Rf_install("pipe");
nano_MonitorSymbol = Rf_install("monitor");
nano_ProtocolSymbol = Rf_install("protocol");
nano_ResolveSymbol = Rf_install("resolve");
nano_ResponseSymbol = Rf_install("response");
Expand Down Expand Up @@ -160,6 +160,8 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_listener_close", (DL_FUNC) &rnng_listener_close, 1},
{"rnng_listener_start", (DL_FUNC) &rnng_listener_start, 1},
{"rnng_messenger", (DL_FUNC) &rnng_messenger, 1},
{"rnng_monitor_create", (DL_FUNC) &rnng_monitor_create, 2},
{"rnng_monitor_read", (DL_FUNC) &rnng_monitor_read, 1},
{"rnng_ncurl", (DL_FUNC) &rnng_ncurl, 9},
{"rnng_ncurl_aio", (DL_FUNC) &rnng_ncurl_aio, 9},
{"rnng_ncurl_session", (DL_FUNC) &rnng_ncurl_session, 8},
Expand Down
11 changes: 10 additions & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ typedef struct nano_cv_duo_s {
nano_cv *cv2;
} nano_cv_duo;

typedef struct nano_monitor_s {
nano_cv *cv;
int *ids;
int size;
int updates;
} nano_monitor;

typedef struct nano_thread_aio_s {
nng_thread *thr;
nano_cv *cv;
Expand Down Expand Up @@ -243,7 +250,7 @@ extern SEXP nano_DotcallSymbol;
extern SEXP nano_HeadersSymbol;
extern SEXP nano_IdSymbol;
extern SEXP nano_ListenerSymbol;
extern SEXP nano_PipeSymbol;
extern SEXP nano_MonitorSymbol;
extern SEXP nano_ProtocolSymbol;
extern SEXP nano_ResolveSymbol;
extern SEXP nano_ResponseSymbol;
Expand Down Expand Up @@ -337,6 +344,8 @@ SEXP rnng_listener_close(SEXP);
SEXP rnng_listener_start(SEXP);
SEXP rnng_messenger(SEXP);
SEXP rnng_messenger_thread_create(SEXP);
SEXP rnng_monitor_create(SEXP, SEXP);
SEXP rnng_monitor_read(SEXP);
SEXP rnng_ncurl(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_ncurl_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_ncurl_session(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
Expand Down
93 changes: 93 additions & 0 deletions src/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,31 @@ static void pipe_cb_dropcon(nng_pipe p, nng_pipe_ev ev, void *arg) {

}

static void pipe_cb_monitor(nng_pipe p, nng_pipe_ev ev, void *arg) {

nano_monitor *monitor = (nano_monitor *) arg;

nano_cv *ncv = monitor->cv;
nng_cv *cv = ncv->cv;
nng_mtx *mtx = ncv->mtx;

const int id = (int) p.id;
if (!id)
return;

nng_mtx_lock(mtx);
if (monitor->updates >= monitor->size) {
monitor->size += 8;
monitor->ids = R_Realloc(monitor->ids, monitor->size, int);
}
monitor->ids[monitor->updates] = ev == NNG_PIPE_EV_ADD_POST ? id : -id;
monitor->updates++;
ncv->condition++;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);

}

// finalizers ------------------------------------------------------------------

static void cv_duo_finalizer(SEXP xptr) {
Expand All @@ -184,6 +209,15 @@ static void request_finalizer(SEXP xptr) {

}

static void monitor_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_monitor *xp = (nano_monitor *) NANO_PTR(xptr);
R_Free(xp->ids);
R_Free(xp);

}

// synchronization primitives --------------------------------------------------

SEXP rnng_cv_alloc(void) {
Expand Down Expand Up @@ -639,3 +673,62 @@ SEXP rnng_interrupt_switch(void) {
return R_NilValue;

}

// monitors --------------------------------------------------------------------

SEXP rnng_monitor_create(SEXP socket, SEXP cv) {

if (NANO_TAG(socket) != nano_SocketSymbol)
Rf_error("'socket' is not a valid Socket");

if (NANO_TAG(cv) != nano_CvSymbol)
Rf_error("'cv' is not a valid Condition Variable");

const int n = 8;
nano_monitor *monitor = R_Calloc(1, nano_monitor);
monitor->ids = R_Calloc(n, int);
monitor->size = n;
monitor->cv = (nano_cv *) NANO_PTR(cv);
nng_socket *sock = (nng_socket *) NANO_PTR(socket);

int xc;

if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_monitor, monitor)))
ERROR_OUT(xc);

if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_monitor, monitor)))
ERROR_OUT(xc);

SEXP xptr = R_MakeExternalPtr(monitor, nano_MonitorSymbol, R_NilValue);
R_RegisterCFinalizerEx(xptr, monitor_finalizer, TRUE);

return xptr;

}

SEXP rnng_monitor_read(SEXP x) {

if (NANO_TAG(x) != nano_MonitorSymbol)
Rf_error("'x' is not a valid Monitor");

nano_monitor *monitor = (nano_monitor *) NANO_PTR(x);

nano_cv *ncv = monitor->cv;
nng_mtx *mtx = ncv->mtx;

SEXP out;
nng_mtx_lock(mtx);
const int updates = monitor->updates;
if (updates) {
out = Rf_allocVector(INTSXP, updates);
memcpy(NANO_DATAPTR(out), monitor->ids, updates * sizeof(int));
monitor->updates = 0;
}
nng_mtx_unlock(mtx);

if (!updates)
out = nano_success;

return out;

}

0 comments on commit f639cfa

Please sign in to comment.