From acf963dbea645127cec43b5e17be5474182e75f0 Mon Sep 17 00:00:00 2001 From: Frederic Delechamp Date: Wed, 22 Feb 2017 15:05:44 +0100 Subject: [PATCH] Fixed ZPoller interestops call + Added tests for Zpoller --- src/main/java/org/zeromq/ZPoller.java | 77 +++++++++++-- src/main/java/zmq/PollItem.java | 7 +- src/test/java/org/zeromq/TestZPoller.java | 131 +++++++++++++++++++--- 3 files changed, 191 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/zeromq/ZPoller.java b/src/main/java/org/zeromq/ZPoller.java index e52b21380..bbf00e685 100644 --- a/src/main/java/org/zeromq/ZPoller.java +++ b/src/main/java/org/zeromq/ZPoller.java @@ -230,7 +230,7 @@ public EventsHandler handler() /******************************************************************************/ /* 0MQ socket events */ /******************************************************************************/ - /** + /* * These values can be ORed to specify what we want to poll for. */ public static final int POLLIN = Poller.POLLIN; @@ -242,6 +242,10 @@ public EventsHandler handler() public static final int OUT = POLLOUT; public static final int ERR = POLLERR; + // same values with semantic consistency + public static final int READABLE = POLLIN; + public static final int WRITABLE = POLLOUT; + /** * Creates a new poller based on the current one. * This will be a shadow poller, sharing the same selector and items creator. @@ -265,9 +269,21 @@ public ZPoller(final Selector selector) } /** - * Creates a new poller based on the current one. - * This will be a shadow poller, sharing the same selector. - * The global events handler will not be shared. + * Creates a new poller attached to a given context that will provide + * selector for operational polling. + * + * @param context + * the context that will provide the selector to use for polling. + */ + public ZPoller(final ZContext context) + { + this(new SimpleCreator(), context.createSelector()); + } + + /** + * Creates a new poller based on the current one. This will be a shadow + * poller, sharing the same selector. The global events handler will not be + * shared. * * @param creator the items creator * @param poller the main poller. @@ -277,6 +293,20 @@ public ZPoller(final ItemCreator creator, final ZPoller poller) this(creator, poller.selector); } + /** + * Creates a new poller attached to a given context that will provide + * selector for operational polling. + * + * @param creator + * the items creator + * @param context + * the context that will provide the selector to use for polling. + */ + public ZPoller(final ItemCreator creator, final ZContext context) + { + this(creator, context.createSelector()); + } + /** * Creates a new poller. * @@ -425,9 +455,7 @@ public final boolean unregister(final Object socketOrChannel) * happens; if 0, it will return immediately; * otherwise, it will wait for at most that many * milliseconds/microseconds (see above). - * * @see "http://api.zeromq.org/3-0:zmq-poll" - * * @return how many objects where signaled by poll () */ public int poll(final long timeout) @@ -441,7 +469,6 @@ public int poll(final long timeout) * @param timeout the timeout, as per zmq_poll (); * @param dispatchEvents true to dispatch events using items handler and the global one. * @see "http://api.zeromq.org/3-0:zmq-poll" - * * @return how many objects where signaled by poll () */ protected int poll(final long timeout, final boolean dispatchEvents) @@ -571,13 +598,23 @@ public boolean readable(final Socket socket) // checks for read event public boolean readable(final Object socketOrChannel) { - final zmq.PollItem it = filter(socketOrChannel, IN); + final zmq.PollItem it = filter(socketOrChannel, READABLE); if (it == null) { return false; } return it.isReadable(); } + public boolean pollin(final Socket socket) + { + return isReadable(socket); + } + + public boolean pollin(final SelectableChannel channel) + { + return isReadable(channel); + } + /** * Tells if a channel is writable from this poller. * @@ -613,13 +650,23 @@ public boolean writable(final Socket socket) // checks for write event public boolean writable(final Object socketOrChannel) { - final zmq.PollItem it = filter(socketOrChannel, OUT); + final zmq.PollItem it = filter(socketOrChannel, WRITABLE); if (it == null) { return false; } return it.isWritable(); } + public boolean pollout(final Socket socket) + { + return isWritable(socket); + } + + public boolean pollout(final SelectableChannel channel) + { + return isWritable(channel); + } + /** * Tells if a channel is in error from this poller. * @@ -662,6 +709,16 @@ public boolean error(final Object socketOrChannel) return it.isError(); } + public boolean pollerr(final Socket socket) + { + return isError(socket); + } + + public boolean pollerr(final SelectableChannel channel) + { + return isError(channel); + } + /** * Destroys the poller. Does actually nothing. */ @@ -781,7 +838,7 @@ protected zmq.PollItem filter(final Object socketOrChannel, int events) final Iterable items = items(socketOrChannel); for (ItemHolder item : items) { final zmq.PollItem it = item.item(); - if ((it.interestOps() & events) > 0) { + if ((it.zinterestOps() & events) > 0) { return it; } } diff --git a/src/main/java/zmq/PollItem.java b/src/main/java/zmq/PollItem.java index b3bbe00e1..be731b214 100644 --- a/src/main/java/zmq/PollItem.java +++ b/src/main/java/zmq/PollItem.java @@ -41,7 +41,7 @@ private void init(int ops) interest |= SelectionKey.OP_READ; } if ((ops & ZMQ.ZMQ_POLLOUT) > 0) { - if (socket != null) { // ZMQ Socket get readiness from the mailbox + if (socket != null) { // ZMQ Socket get readiness from the mailbox interest |= SelectionKey.OP_READ; } else { @@ -92,6 +92,11 @@ public final int interestOps() return interest; } + public final int zinterestOps() + { + return zinterest; + } + public final int interestOps(int ops) { init(ops); diff --git a/src/test/java/org/zeromq/TestZPoller.java b/src/test/java/org/zeromq/TestZPoller.java index 939d53f20..1eec28137 100644 --- a/src/test/java/org/zeromq/TestZPoller.java +++ b/src/test/java/org/zeromq/TestZPoller.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.nio.channels.SelectableChannel; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Test; @@ -10,33 +11,138 @@ public class TestZPoller { + private static class EventsHandlerAdapter implements EventsHandler + { + @Override + public boolean events(SelectableChannel channel, int events) + { + return false; + } + + @Override + public boolean events(Socket socket, int events) + { + return false; + } + } + + static class Server extends Thread + { + private final int port; + private final Socket socket; + private final ZPoller poller; + + public Server(ZContext context, int port) + { + this.port = port; + socket = context.createSocket(ZMQ.PUSH); + poller = new ZPoller(context); + } + + @Override + public void run() + { + socket.bind("tcp://127.0.0.1:" + port); + + poller.register(socket, ZPoller.WRITABLE); + while (!Thread.currentThread().isInterrupted()) { + poller.poll(-1); + if (poller.isWritable(socket)) { + ZMsg msg = new ZMsg(); + msg.add("OK"); + msg.send(socket); + + break; + } + else { + Assert.fail("unable to get server socket in writable state"); + } + } + socket.close(); + + try { + poller.close(); + } + catch (IOException e) { + e.printStackTrace(); + Assert.fail("error while closing poller " + e.getMessage()); + } + } + } + + @Test + public void testPollerPollout() throws IOException, InterruptedException + { + final int port = Utils.findOpenPort(); + + final ZContext context = new ZContext(); + final ZPoller poller = new ZPoller(context.createSelector()); + final ZMQ.Socket receiver = context.createSocket(ZMQ.PULL); + + final Server client = new Server(context, port); + client.start(); + + try { + receiver.connect("tcp://127.0.0.1:" + port); + + final AtomicReference msg = new AtomicReference<>(); + poller.register(receiver, new EventsHandlerAdapter() + { + @Override + public boolean events(Socket s, int events) + { + if (receiver.equals(s)) { + msg.set(ZMsg.recvMsg(receiver)); + return false; + } + else { + return true; + } + } + }, ZPoller.IN); + int maxAttempts = 100; + while (!Thread.currentThread().isInterrupted() && maxAttempts-- > 0) { + int rc = poller.poll(1000); + if (rc < 0) { + break; + } + } + client.join(); + Assert.assertNotNull("unable to receive msg after several cycles", msg); + } + finally { + receiver.close(); + context.close(); + poller.close(); + } + } + @Test public void testUseNull() throws IOException { - ZContext ctx = null; //new ZContext(); ZPoller poller = new ZPoller(new ZStar.VerySimpleSelectorCreator().create()); SelectableChannel channel = null; - Socket socket = null; //ctx.createSocket(ZMQ.SUB); + Socket socket = null; // ctx.createSocket(ZMQ.SUB); boolean rc = false; rc = poller.register(socket, ZPoller.IN); - Assert.assertFalse("Registering a null socket was successful", rc); + Assert.assertFalse("Registering a null socket was successful", rc); rc = poller.register(channel, ZPoller.OUT); - Assert.assertFalse("Registering a null channel was successful", rc); + Assert.assertFalse("Registering a null channel was successful", rc); int events = poller.poll(10); - Assert.assertEquals("reading event on without sockets", 0, events); + Assert.assertEquals("reading event on without sockets", 0, events); rc = poller.isReadable(socket); - Assert.assertFalse("checking read event on a null socket was successful", rc); + Assert.assertFalse("checking read event on a null socket was successful", rc); rc = poller.writable(socket); - Assert.assertFalse("checking write event on a null socket was successful", rc); + Assert.assertFalse("checking write event on a null socket was successful", rc); rc = poller.readable(channel); - Assert.assertFalse("checking read event on a null channel was successful", rc); + Assert.assertFalse("checking read event on a null channel was successful", rc); rc = poller.isWritable(channel); - Assert.assertFalse("checking write event on a null channel was successful", rc); + Assert.assertFalse("checking write event on a null channel was successful", rc); EventsHandler global = null; @@ -44,14 +150,13 @@ public void testUseNull() throws IOException EventsHandler handler = null; rc = poller.register(socket, handler, ZPoller.ERR); - Assert.assertFalse("Register with handler on a null socket was successful", rc); + Assert.assertFalse("Register with handler on a null socket was successful", rc); rc = poller.register(channel, ZPoller.ERR); - Assert.assertFalse("Register with handler on a null channel was successful", rc); + Assert.assertFalse("Register with handler on a null channel was successful", rc); events = poller.poll(10); - Assert.assertEquals("reading event with events handlers without sockets", 0, events); + Assert.assertEquals("reading event with events handlers without sockets", 0, events); poller.close(); - System.out.println(); } }