Skip to content

Commit

Permalink
Merge pull request #402 from fredoboulo/zpoller-tests
Browse files Browse the repository at this point in the history
Zpoller test
  • Loading branch information
trevorbernard authored Feb 27, 2017
2 parents 1878614 + acf963d commit f7a9d0d
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 24 deletions.
77 changes: 67 additions & 10 deletions src/main/java/org/zeromq/ZPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public EventsHandler handler()
/******************************************************************************/
/* 0MQ socket events */
/******************************************************************************/
/**
/*
* These values can be ORed to specify what we want to poll for.
*/
public static final int POLLIN = Poller.POLLIN;
Expand All @@ -242,6 +242,10 @@ public EventsHandler handler()
public static final int OUT = POLLOUT;
public static final int ERR = POLLERR;

// same values with semantic consistency
public static final int READABLE = POLLIN;
public static final int WRITABLE = POLLOUT;

/**
* Creates a new poller based on the current one.
* This will be a shadow poller, sharing the same selector and items creator.
Expand All @@ -265,9 +269,21 @@ public ZPoller(final Selector selector)
}

/**
* Creates a new poller based on the current one.
* This will be a shadow poller, sharing the same selector.
* The global events handler will not be shared.
* Creates a new poller attached to a given context that will provide
* selector for operational polling.
*
* @param context
* the context that will provide the selector to use for polling.
*/
public ZPoller(final ZContext context)
{
this(new SimpleCreator(), context.createSelector());
}

/**
* Creates a new poller based on the current one. This will be a shadow
* poller, sharing the same selector. The global events handler will not be
* shared.
*
* @param creator the items creator
* @param poller the main poller.
Expand All @@ -277,6 +293,20 @@ public ZPoller(final ItemCreator creator, final ZPoller poller)
this(creator, poller.selector);
}

/**
* Creates a new poller attached to a given context that will provide
* selector for operational polling.
*
* @param creator
* the items creator
* @param context
* the context that will provide the selector to use for polling.
*/
public ZPoller(final ItemCreator creator, final ZContext context)
{
this(creator, context.createSelector());
}

/**
* Creates a new poller.
*
Expand Down Expand Up @@ -425,9 +455,7 @@ public final boolean unregister(final Object socketOrChannel)
* happens; if 0, it will return immediately;
* otherwise, it will wait for at most that many
* milliseconds/microseconds (see above).
*
* @see "http://api.zeromq.org/3-0:zmq-poll"
*
* @return how many objects where signaled by poll ()
*/
public int poll(final long timeout)
Expand All @@ -441,7 +469,6 @@ public int poll(final long timeout)
* @param timeout the timeout, as per zmq_poll ();
* @param dispatchEvents true to dispatch events using items handler and the global one.
* @see "http://api.zeromq.org/3-0:zmq-poll"
*
* @return how many objects where signaled by poll ()
*/
protected int poll(final long timeout, final boolean dispatchEvents)
Expand Down Expand Up @@ -571,13 +598,23 @@ public boolean readable(final Socket socket)
// checks for read event
public boolean readable(final Object socketOrChannel)
{
final zmq.PollItem it = filter(socketOrChannel, IN);
final zmq.PollItem it = filter(socketOrChannel, READABLE);
if (it == null) {
return false;
}
return it.isReadable();
}

public boolean pollin(final Socket socket)
{
return isReadable(socket);
}

public boolean pollin(final SelectableChannel channel)
{
return isReadable(channel);
}

/**
* Tells if a channel is writable from this poller.
*
Expand Down Expand Up @@ -613,13 +650,23 @@ public boolean writable(final Socket socket)
// checks for write event
public boolean writable(final Object socketOrChannel)
{
final zmq.PollItem it = filter(socketOrChannel, OUT);
final zmq.PollItem it = filter(socketOrChannel, WRITABLE);
if (it == null) {
return false;
}
return it.isWritable();
}

public boolean pollout(final Socket socket)
{
return isWritable(socket);
}

public boolean pollout(final SelectableChannel channel)
{
return isWritable(channel);
}

/**
* Tells if a channel is in error from this poller.
*
Expand Down Expand Up @@ -662,6 +709,16 @@ public boolean error(final Object socketOrChannel)
return it.isError();
}

public boolean pollerr(final Socket socket)
{
return isError(socket);
}

public boolean pollerr(final SelectableChannel channel)
{
return isError(channel);
}

/**
* Destroys the poller. Does actually nothing.
*/
Expand Down Expand Up @@ -781,7 +838,7 @@ protected zmq.PollItem filter(final Object socketOrChannel, int events)
final Iterable<ItemHolder> items = items(socketOrChannel);
for (ItemHolder item : items) {
final zmq.PollItem it = item.item();
if ((it.interestOps() & events) > 0) {
if ((it.zinterestOps() & events) > 0) {
return it;
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/zmq/PollItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private void init(int ops)
interest |= SelectionKey.OP_READ;
}
if ((ops & ZMQ.ZMQ_POLLOUT) > 0) {
if (socket != null) { // ZMQ Socket get readiness from the mailbox
if (socket != null) { // ZMQ Socket get readiness from the mailbox
interest |= SelectionKey.OP_READ;
}
else {
Expand Down Expand Up @@ -92,6 +92,11 @@ public final int interestOps()
return interest;
}

public final int zinterestOps()
{
return zinterest;
}

public final int interestOps(int ops)
{
init(ops);
Expand Down
131 changes: 118 additions & 13 deletions src/test/java/org/zeromq/TestZPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Assert;
import org.junit.Test;
Expand All @@ -10,48 +11,152 @@

public class TestZPoller
{
private static class EventsHandlerAdapter implements EventsHandler
{
@Override
public boolean events(SelectableChannel channel, int events)
{
return false;
}

@Override
public boolean events(Socket socket, int events)
{
return false;
}
}

static class Server extends Thread
{
private final int port;
private final Socket socket;
private final ZPoller poller;

public Server(ZContext context, int port)
{
this.port = port;
socket = context.createSocket(ZMQ.PUSH);
poller = new ZPoller(context);
}

@Override
public void run()
{
socket.bind("tcp://127.0.0.1:" + port);

poller.register(socket, ZPoller.WRITABLE);
while (!Thread.currentThread().isInterrupted()) {
poller.poll(-1);
if (poller.isWritable(socket)) {
ZMsg msg = new ZMsg();
msg.add("OK");
msg.send(socket);

break;
}
else {
Assert.fail("unable to get server socket in writable state");
}
}
socket.close();

try {
poller.close();
}
catch (IOException e) {
e.printStackTrace();
Assert.fail("error while closing poller " + e.getMessage());
}
}
}

@Test
public void testPollerPollout() throws IOException, InterruptedException
{
final int port = Utils.findOpenPort();

final ZContext context = new ZContext();
final ZPoller poller = new ZPoller(context.createSelector());
final ZMQ.Socket receiver = context.createSocket(ZMQ.PULL);

final Server client = new Server(context, port);
client.start();

try {
receiver.connect("tcp://127.0.0.1:" + port);

final AtomicReference<ZMsg> msg = new AtomicReference<>();
poller.register(receiver, new EventsHandlerAdapter()
{
@Override
public boolean events(Socket s, int events)
{
if (receiver.equals(s)) {
msg.set(ZMsg.recvMsg(receiver));
return false;
}
else {
return true;
}
}
}, ZPoller.IN);
int maxAttempts = 100;
while (!Thread.currentThread().isInterrupted() && maxAttempts-- > 0) {
int rc = poller.poll(1000);
if (rc < 0) {
break;
}
}
client.join();
Assert.assertNotNull("unable to receive msg after several cycles", msg);
}
finally {
receiver.close();
context.close();
poller.close();
}
}

@Test
public void testUseNull() throws IOException
{
ZContext ctx = null; //new ZContext();
ZPoller poller = new ZPoller(new ZStar.VerySimpleSelectorCreator().create());

SelectableChannel channel = null;
Socket socket = null; //ctx.createSocket(ZMQ.SUB);
Socket socket = null; // ctx.createSocket(ZMQ.SUB);

boolean rc = false;
rc = poller.register(socket, ZPoller.IN);
Assert.assertFalse("Registering a null socket was successful", rc);
Assert.assertFalse("Registering a null socket was successful", rc);
rc = poller.register(channel, ZPoller.OUT);
Assert.assertFalse("Registering a null channel was successful", rc);
Assert.assertFalse("Registering a null channel was successful", rc);

int events = poller.poll(10);
Assert.assertEquals("reading event on without sockets", 0, events);
Assert.assertEquals("reading event on without sockets", 0, events);

rc = poller.isReadable(socket);
Assert.assertFalse("checking read event on a null socket was successful", rc);
Assert.assertFalse("checking read event on a null socket was successful", rc);
rc = poller.writable(socket);
Assert.assertFalse("checking write event on a null socket was successful", rc);
Assert.assertFalse("checking write event on a null socket was successful", rc);

rc = poller.readable(channel);
Assert.assertFalse("checking read event on a null channel was successful", rc);
Assert.assertFalse("checking read event on a null channel was successful", rc);
rc = poller.isWritable(channel);
Assert.assertFalse("checking write event on a null channel was successful", rc);
Assert.assertFalse("checking write event on a null channel was successful", rc);

EventsHandler global = null;

poller.setGlobalHandler(global);

EventsHandler handler = null;
rc = poller.register(socket, handler, ZPoller.ERR);
Assert.assertFalse("Register with handler on a null socket was successful", rc);
Assert.assertFalse("Register with handler on a null socket was successful", rc);
rc = poller.register(channel, ZPoller.ERR);
Assert.assertFalse("Register with handler on a null channel was successful", rc);
Assert.assertFalse("Register with handler on a null channel was successful", rc);

events = poller.poll(10);
Assert.assertEquals("reading event with events handlers without sockets", 0, events);
Assert.assertEquals("reading event with events handlers without sockets", 0, events);

poller.close();
System.out.println();
}
}

0 comments on commit f7a9d0d

Please sign in to comment.