Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intoduce new API to handle incoming message by passing handler #576

Merged
merged 6 commits into from
Aug 19, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/org/zeromq/ZMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -3162,7 +3162,7 @@ public byte[] recv()

/**
* Stream of incoming messages
*
* <p>
* This API is in DRAFT state and is subject to change at ANY time until declared stable
*
* @return infinite stream of the incoming messages
Expand Down
42 changes: 41 additions & 1 deletion src/main/java/org/zeromq/ZMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.function.Consumer;

import org.zeromq.ZMQ.Socket;
import zmq.util.Draft;

/**
* The ZMsg class provides methods to send and receive multipart messages
Expand Down Expand Up @@ -250,7 +252,45 @@ public static ZMsg recvMsg(Socket socket, int flag)
return msg;
}

/**
/**
* This API is in DRAFT state and is subject to change at ANY time until declared stable
* handle incoming message with a handler
*
* @param socket
* @param flags see ZMQ constants
* @param handler handler to handle incoming message
* @param exceptionHandler handler to handle exceptions
*/
@Draft
public static void recvMsg(ZMQ.Socket socket, int flags,
Consumer<ZMsg> handler,
Consumer<ZMQException> exceptionHandler)
{
ZMsg msg = null;
try {
msg = ZMsg.recvMsg(socket, flags);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With your version, if an exception occurs, handler.accept() will still be called, with a null argument.
Putting the accept inside the try-catch block will remove that behavior.

try {
   handler.accept(ZMsg.recvMsg(socket, flags));
} ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested getting rid of the try/catch altogether and just letting the exception happen, which he's implemented. I think that's probably the best thing to do, so that the caller can handle the exception when he/she doesn't provide an exception handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake -- this is a different part of the code!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@esahekmat first of all, thanks for your proposal. I do not want to restrict your free will on the topic, you seem to provide a fair progress in the API, and I definitely support you in your efforts of improving it.

What I'm about to say is much about the lessons learned from my history, where callbacks tend to be accumulating slowly to the point where the code is barely readable, with callbacks within callbacks within callbacks (and is mostly javascript-related). I now tend to use monadic structures, like CompletableFuture (or even better, Observable from Rx), because they allow to chain operations in a much better way and to handle errors more gracefully.

For example, if there was a method:

    public static CompletableFuture<ZMsg> recv(Socket socket, int flags)
    {
        return CompletableFuture.supplyAsync(() -> ZMsg.recvMsg(socket, flags), executor);
    }

User could chain calls like that to concatenate the first frames of first 2 messages together:

            CompletableFuture<ZMsg> future = ZMsg.recv(socket, 0);

            future.thenApply(ZMsg::popString).exceptionally(err -> "ERROR")
                    .thenCombine(ZMsg.recv(socket, 0), (string, msg) -> {
                        return string + msg.popString();
                    }).thenAccept(string -> {
                        System.out.println(string);
                    });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fredoboulo
I agree with your point of view, designing a new API is challenging and community collaboration on it helps a lot to introduce better API. actually this is the beauty of open source community.
Also, I know about callback hell, and it would be great to use a CompletableFuture instead of callback but I'm worried supply method inside of supplyAsync will run on another thread, and as I know we should do working with a socket in the same thread that we have created that socket by it before. I think CompletableFuture.supplyAsync try to run its argument in the ForkJoinPool::common pool, and I think it's not an appropriate way to receive the message from the given socket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly thinking out loud, not asking to change your way of doing :)

I'm quite glad to see proposal of new API here, let's see what it will bring!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested in CompletableFuture pattern, I'm trying to find a way to correctly implement it. I have an idea, we can have a separate top-level class, which has methods to create socket, send and receive messages and so on. This class should have an internal SingleThreadPoolExecutorService to do all the stuff async. and has a internal Queue to store and retrieve commands, there would be some public methods for each task(create socket,receive message, send message, close socket...) and each of these methods just simply put a command in the internal queue, and the internal thread pull commands one by one and execute them. In this approach, we can hide main while loop from clients although I afraid that this middle queue maybe has a bad effect on the performance. What do you think about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other words, we can sum up all the above approach in the idea of having a thread-safe socket manager class.

}
catch (ZMQException e) {
exceptionHandler.accept(e);
}
handler.accept(msg);
}

/**
* This API is in DRAFT state and is subject to change at ANY time until declared stable
* handle incoming message with a handler
*
* @param socket
* @param flags see ZMQ constants
* @param handler handler to handle incoming message
*/
@Draft
public static void recvMsg(ZMQ.Socket socket, int flags, Consumer<ZMsg> handler)
{
handler.accept(ZMsg.recvMsg(socket, flags));
}

/**
* Save message to an open data output stream.
*
* Data saved as:
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/org/zeromq/ZMsgTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public void testRecvMsg() throws Exception
ZMQ.Context ctx = ZMQ.context(0);
ZMQ.Socket socket = ctx.socket(SocketType.PULL);

ZMsg msg = ZMsg.recvMsg(socket, ZMQ.NOBLOCK);
assertThat(msg, nullValue());
ZMsg.recvMsg(socket, ZMQ.NOBLOCK, (msg)-> assertThat(msg, nullValue()));

socket.close();
ctx.close();
Expand Down