Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intoduce new API to handle incoming message by passing handler #576

Merged
merged 6 commits into from
Aug 19, 2018

Conversation

isahekmat
Copy link
Contributor

No description provided.

@coveralls
Copy link

coveralls commented Jul 17, 2018

Coverage Status

Coverage decreased (-0.1%) to 83.53% when pulling 80aed61 on esahekmat:master into 796ec58 on zeromq:master.

Copy link
Contributor

@daveyarwood daveyarwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea! Code looks good to me, as well.

Leaving this open for a bit, to give others a chance to review if they want, but I think this is ready to merge.

@Draft
public void recv(Consumer<ZMsg> handler, int flags)
{
handler.accept(ZMsg.recvMsg(this, flags));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather have preferred the opposite: a static method in ZMsg with a consumer parameter. ZMQ.Socket is ZMsg-agnostic, and this starts to introduce a dependency. As far as I can see, the logic of ZMQ has been to stack API on top of each other, so users are free to choose at which level they want to operate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to correct the placement of the API, I wish if there is a documentation about this layering, it's would be great if we have an example, implemented in various layers of the API.

* </ul>
*/
@Draft
public void recv(Consumer<ZMsg> handler, int flags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using a promise-like API, as CompletableFuture, or an additional callback for handling exceptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a new method with an exceptionHandler callback, Thanks for your point

msg = ZMsg.recvMsg(socket, flags);
}
catch (ZMQException ignore) {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if no exception handler is provided, we should just let the exceptions happen so that calling code can handle them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, my mistake

{
ZMsg msg = null;
try {
msg = ZMsg.recvMsg(socket, flags);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With your version, if an exception occurs, handler.accept() will still be called, with a null argument.
Putting the accept inside the try-catch block will remove that behavior.

try {
   handler.accept(ZMsg.recvMsg(socket, flags));
} ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested getting rid of the try/catch altogether and just letting the exception happen, which he's implemented. I think that's probably the best thing to do, so that the caller can handle the exception when he/she doesn't provide an exception handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake -- this is a different part of the code!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@esahekmat first of all, thanks for your proposal. I do not want to restrict your free will on the topic, you seem to provide a fair progress in the API, and I definitely support you in your efforts of improving it.

What I'm about to say is much about the lessons learned from my history, where callbacks tend to be accumulating slowly to the point where the code is barely readable, with callbacks within callbacks within callbacks (and is mostly javascript-related). I now tend to use monadic structures, like CompletableFuture (or even better, Observable from Rx), because they allow to chain operations in a much better way and to handle errors more gracefully.

For example, if there was a method:

    public static CompletableFuture<ZMsg> recv(Socket socket, int flags)
    {
        return CompletableFuture.supplyAsync(() -> ZMsg.recvMsg(socket, flags), executor);
    }

User could chain calls like that to concatenate the first frames of first 2 messages together:

            CompletableFuture<ZMsg> future = ZMsg.recv(socket, 0);

            future.thenApply(ZMsg::popString).exceptionally(err -> "ERROR")
                    .thenCombine(ZMsg.recv(socket, 0), (string, msg) -> {
                        return string + msg.popString();
                    }).thenAccept(string -> {
                        System.out.println(string);
                    });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fredoboulo
I agree with your point of view, designing a new API is challenging and community collaboration on it helps a lot to introduce better API. actually this is the beauty of open source community.
Also, I know about callback hell, and it would be great to use a CompletableFuture instead of callback but I'm worried supply method inside of supplyAsync will run on another thread, and as I know we should do working with a socket in the same thread that we have created that socket by it before. I think CompletableFuture.supplyAsync try to run its argument in the ForkJoinPool::common pool, and I think it's not an appropriate way to receive the message from the given socket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly thinking out loud, not asking to change your way of doing :)

I'm quite glad to see proposal of new API here, let's see what it will bring!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested in CompletableFuture pattern, I'm trying to find a way to correctly implement it. I have an idea, we can have a separate top-level class, which has methods to create socket, send and receive messages and so on. This class should have an internal SingleThreadPoolExecutorService to do all the stuff async. and has a internal Queue to store and retrieve commands, there would be some public methods for each task(create socket,receive message, send message, close socket...) and each of these methods just simply put a command in the internal queue, and the internal thread pull commands one by one and execute them. In this approach, we can hide main while loop from clients although I afraid that this middle queue maybe has a bad effect on the performance. What do you think about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other words, we can sum up all the above approach in the idea of having a thread-safe socket manager class.

@isahekmat
Copy link
Contributor Author

So what is the state of this PR? should I continue working on it or it's cool enough to be merged as a draft API?

@trevorbernard
Copy link
Member

Technically, all this patch needs to satisfy before it's merged is the C4 process, which this does.

@trevorbernard trevorbernard merged commit 174c232 into zeromq:master Aug 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants