Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZMQ_XPUB_MANUAL subscriptions not properly cleaned up #2601

Closed
mrseanmorris opened this issue Jun 21, 2017 · 3 comments · Fixed by #2603
Closed

ZMQ_XPUB_MANUAL subscriptions not properly cleaned up #2601

mrseanmorris opened this issue Jun 21, 2017 · 3 comments · Fixed by #2603

Comments

@mrseanmorris
Copy link

Hi there,

I'm working on implementing a service that uses the ZMQ_XPUB_MANUAL feature, and I've come across a what I think is a bug in how subscriptions are cleaned up. The problem happens when a client doesn't cleanly unsubscribe from topics, and instead either closes the socket without cleaning up, or simply just crashes.

After this happens, the subscriptions seem to get out of sync and subsequent clients that connect and subscribe will receive messages on topics for which they did not subscribe.

Here's a simple reproducer that shows the problem :

#include <cassert>
#include <iostream>

#include <zmq.h>

int main( int argc, char * argv[] )
{
    int rc = 0;
    int manual = 1;

    void *ctx = zmq_ctx_new();
    assert (ctx);

    void *publisher = zmq_socket (ctx, ZMQ_XPUB);
    assert (publisher);
    rc = zmq_setsockopt (publisher, ZMQ_XPUB_MANUAL, &manual, sizeof(manual));
    assert (rc==0);
    rc = zmq_bind (publisher, "tcp://*:9990");
    assert (rc==0);

    void *receiver = zmq_socket (ctx, ZMQ_XSUB);
    assert (receiver);
    rc = zmq_connect (receiver, "tcp://127.0.0.1:9990");
    assert (rc==0);

    // send a subscription for A
    char subscription[2] = {0x01, 'A'};
    rc = zmq_send (receiver, subscription, 2, 0);
    assert (rc==2);

    // receive the subscription, overwrite it to XA
    char buffer[2];
    rc = zmq_recv (publisher, buffer, 2, 0);
    assert (rc==2);
    assert (buffer[0] == 0x01);
    assert (buffer[1] == 'A');
    rc = zmq_setsockopt (publisher, ZMQ_SUBSCRIBE, "XA", 2);
    assert (rc==0);

    // send 2 messages
    rc = zmq_send_const (publisher, "XA", 2, 0);
    assert (rc==2);
    rc = zmq_send_const (publisher, "XB", 2, 0);
    assert (rc==2);

    // receive the single message
    rc = zmq_recv (receiver, buffer, 2, 0);
    assert (rc==2);
    assert (buffer[0] == 'X');
    assert (buffer[1] == 'A');

    // should be nothing left in the queue
    rc = zmq_recv (receiver, buffer, 2, ZMQ_DONTWAIT);
    assert (rc==-1);

    // close the socket
    rc = zmq_close (receiver);
    assert (rc==0);

    // closing the socket will result in an unsubscribe event
    rc = zmq_recv (publisher, buffer, 2, 0);
    assert (rc==2);
    assert (buffer[0] == 0x00);
    assert (buffer[1] == 'A');

    // this doesn't really do anything..  there is no last_pipe set it will just fail silently
    rc = zmq_setsockopt (publisher, ZMQ_UNSUBSCRIBE, "XA", 2);
    assert (rc==0);

    // create another socket
    void *receiver2 = zmq_socket (ctx, ZMQ_XSUB);
    assert (receiver2);
    rc = zmq_connect (receiver2, "tcp://127.0.0.1:9990");
    assert (rc==0);

    // send a subscription for B
    subscription[0] = 0x01;
    subscription[1] = 'B';
    rc = zmq_send (receiver2, subscription, 2, 0);
    assert (rc==2);

    // receive the subscription, overwrite it to XB
    rc = zmq_recv (publisher, buffer, 2, 0);
    assert (rc==2);
    assert (buffer[0] == 0x01);
    assert (buffer[1] == 'B');
    rc = zmq_setsockopt (publisher, ZMQ_SUBSCRIBE, "XB", 2);
    assert (rc==0);

    // send 2 messages
    rc = zmq_send_const (publisher, "XA", 2, 0);
    assert (rc==2);
    rc = zmq_send_const (publisher, "XB", 2, 0);
    assert (rc==2);

    // receive the single message
    rc = zmq_recv (receiver2, buffer, 2, 0);

    // receiver2 is going to get XA, even though it never subscribed to that topic
    std::cout << "Received message [" << std::string(buffer,2) << "]" << std::endl;
    assert (rc==2);
    assert (buffer[0] == 'X');
    assert (buffer[1] == 'B');  // this assertion will fail

    // should be nothing left in the queue
    rc = zmq_recv (receiver2, buffer, 2, ZMQ_DONTWAIT);
    assert (rc==-1);  // this would also fail, if the previous assertion did not

    // clean up
    rc = zmq_close (receiver2);
    assert (rc==0);
    rc = zmq_close (publisher);
    assert (rc==0);
    rc = zmq_ctx_term(ctx);
    assert (rc==0);

}
bluca added a commit to bluca/libzmq that referenced this issue Jun 22, 2017
Solution: remove the pipe from the real trie when a peer disconnects.
Also add a unit test that exercises the behaviour by reconnecting
a different socket and sending a message that matches.
Fixes zeromq#2601 and introduced by zeromq#2042
@bluca
Copy link
Member

bluca commented Jun 22, 2017

Thanks for the report and especially for the unit test. Can reproduce, was introduced by: #2042

I'm testing a fix at the moment, will probably send it tomorrow.

@bluca
Copy link
Member

bluca commented Jun 22, 2017

@mrseanmorris please try again from latest master, should be fixed.

@mrseanmorris
Copy link
Author

Hi @bluca

Your patch fixed the problem. Thanks for the very fast response!

Cheers,
Sean

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants