-
Notifications
You must be signed in to change notification settings - Fork 484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Intoduce new API to handle incoming message by passing handler #576
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea! Code looks good to me, as well.
Leaving this open for a bit, to give others a chance to review if they want, but I think this is ready to merge.
src/main/java/org/zeromq/ZMQ.java
Outdated
@Draft | ||
public void recv(Consumer<ZMsg> handler, int flags) | ||
{ | ||
handler.accept(ZMsg.recvMsg(this, flags)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather have preferred the opposite: a static method in ZMsg with a consumer parameter. ZMQ.Socket is ZMsg-agnostic, and this starts to introduce a dependency. As far as I can see, the logic of ZMQ has been to stack API on top of each other, so users are free to choose at which level they want to operate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to correct the placement of the API, I wish if there is a documentation about this layering, it's would be great if we have an example, implemented in various layers of the API.
src/main/java/org/zeromq/ZMQ.java
Outdated
* </ul> | ||
*/ | ||
@Draft | ||
public void recv(Consumer<ZMsg> handler, int flags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered using a promise-like API, as CompletableFuture, or an additional callback for handling exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a new method with an exceptionHandler callback, Thanks for your point
src/main/java/org/zeromq/ZMsg.java
Outdated
msg = ZMsg.recvMsg(socket, flags); | ||
} | ||
catch (ZMQException ignore) { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if no exception handler is provided, we should just let the exceptions happen so that calling code can handle them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, my mistake
src/main/java/org/zeromq/ZMsg.java
Outdated
{ | ||
ZMsg msg = null; | ||
try { | ||
msg = ZMsg.recvMsg(socket, flags); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With your version, if an exception occurs, handler.accept() will still be called, with a null argument.
Putting the accept inside the try-catch block will remove that behavior.
try {
handler.accept(ZMsg.recvMsg(socket, flags));
} ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggested getting rid of the try/catch altogether and just letting the exception happen, which he's implemented. I think that's probably the best thing to do, so that the caller can handle the exception when he/she doesn't provide an exception handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake -- this is a different part of the code!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@esahekmat first of all, thanks for your proposal. I do not want to restrict your free will on the topic, you seem to provide a fair progress in the API, and I definitely support you in your efforts of improving it.
What I'm about to say is much about the lessons learned from my history, where callbacks tend to be accumulating slowly to the point where the code is barely readable, with callbacks within callbacks within callbacks (and is mostly javascript-related). I now tend to use monadic structures, like CompletableFuture (or even better, Observable from Rx), because they allow to chain operations in a much better way and to handle errors more gracefully.
For example, if there was a method:
public static CompletableFuture<ZMsg> recv(Socket socket, int flags)
{
return CompletableFuture.supplyAsync(() -> ZMsg.recvMsg(socket, flags), executor);
}
User could chain calls like that to concatenate the first frames of first 2 messages together:
CompletableFuture<ZMsg> future = ZMsg.recv(socket, 0);
future.thenApply(ZMsg::popString).exceptionally(err -> "ERROR")
.thenCombine(ZMsg.recv(socket, 0), (string, msg) -> {
return string + msg.popString();
}).thenAccept(string -> {
System.out.println(string);
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @fredoboulo
I agree with your point of view, designing a new API is challenging and community collaboration on it helps a lot to introduce better API. actually this is the beauty of open source community.
Also, I know about callback hell, and it would be great to use a CompletableFuture instead of callback but I'm worried supply method inside of supplyAsync will run on another thread, and as I know we should do working with a socket in the same thread that we have created that socket by it before. I think CompletableFuture.supplyAsync try to run its argument in the ForkJoinPool::common pool, and I think it's not an appropriate way to receive the message from the given socket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mostly thinking out loud, not asking to change your way of doing :)
I'm quite glad to see proposal of new API here, let's see what it will bring!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm interested in CompletableFuture pattern, I'm trying to find a way to correctly implement it. I have an idea, we can have a separate top-level class, which has methods to create socket, send and receive messages and so on. This class should have an internal SingleThreadPoolExecutorService to do all the stuff async. and has a internal Queue to store and retrieve commands, there would be some public methods for each task(create socket,receive message, send message, close socket...) and each of these methods just simply put a command in the internal queue, and the internal thread pull commands one by one and execute them. In this approach, we can hide main while loop from clients although I afraid that this middle queue maybe has a bad effect on the performance. What do you think about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the other words, we can sum up all the above approach in the idea of having a thread-safe socket manager class.
So what is the state of this PR? should I continue working on it or it's cool enough to be merged as a draft API? |
Technically, all this patch needs to satisfy before it's merged is the C4 process, which this does. |
No description provided.