Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite memory consumption when infinite client reconnection #4218

Closed
Detect1ve opened this issue Jun 15, 2021 · 13 comments
Closed

Infinite memory consumption when infinite client reconnection #4218

Detect1ve opened this issue Jun 15, 2021 · 13 comments

Comments

@Detect1ve
Copy link

Issue description

There are two processes in which context and sockets are created. In the first process, objects are created and deleted. In the second process I see memory growth (looks like a memory leak).

Environment

  • libzmq version (commit hash if unreleased): ZMQ version - 4.3.2
  • OS: Ubuntu 20.04.2 LTS 64-bit

Minimal test code / Steps to reproduce the issue

Simple example without checks for returned error codes:

#include <unistd.h>
#include <sys/wait.h>
#include <zmq.h>

#define CLIENTS_COUNT 2

typedef struct Server
{
    void *req_sock;
} Server;

typedef struct Client
{
    void *req_sock;
} Client;

struct Ipc
{
    void *zmq_ctx;
    void *pub_event_socket;
    void *event_socket;
    int ipc_my_id;
    struct Server servers[CLIENTS_COUNT];
    struct Client clients[CLIENTS_COUNT];
};

void create_zmq(struct Ipc *const context)
{
    char sock_name[43] = {0};
    int const immediate = 1;
    int const max_msg_in_event_queue = 2200;
    int const zmq_linger_value = 10000;

    context->zmq_ctx = zmq_ctx_new();

    snprintf(sock_name, sizeof(sock_name), "ipc:///tmp/ipc_events_from_%d.sock",
        context->ipc_my_id);

    context->pub_event_socket = zmq_socket(context->zmq_ctx, ZMQ_PUB);

    zmq_setsockopt(context->pub_event_socket, ZMQ_IMMEDIATE, &immediate,
        sizeof(immediate));

    zmq_setsockopt(context->pub_event_socket, ZMQ_LINGER, &zmq_linger_value,
        sizeof(zmq_linger_value));

    zmq_setsockopt(context->pub_event_socket, ZMQ_SNDHWM, &max_msg_in_event_queue,
        sizeof(max_msg_in_event_queue));

    zmq_setsockopt(context->pub_event_socket, ZMQ_RCVHWM, &max_msg_in_event_queue,
        sizeof(max_msg_in_event_queue));

    zmq_bind(context->pub_event_socket, sock_name);

    context->event_socket = zmq_socket(context->zmq_ctx, ZMQ_SUB);

    zmq_setsockopt(context->event_socket, ZMQ_IMMEDIATE, &immediate, sizeof(immediate));

    zmq_setsockopt(context->event_socket, ZMQ_LINGER, &zmq_linger_value,
        sizeof(zmq_linger_value));

    zmq_setsockopt(context->event_socket, ZMQ_SNDHWM, &max_msg_in_event_queue,
        sizeof(max_msg_in_event_queue));

    zmq_setsockopt(context->event_socket, ZMQ_RCVHWM, &max_msg_in_event_queue,
        sizeof(max_msg_in_event_queue));
}

int tune_zmq(struct Ipc *const context)
{
    char sock_name[58] = {0};
    int const max_msg_in_request_queue_for_send = 99;
    int const max_msg_in_request_queue_for_recv = 99;
    int const immediate = 1;
    int const zmq_linger_value = 10000;

    for (int i = 0; i < CLIENTS_COUNT; i++)
    {
        if (i == context->ipc_my_id)
        {
            continue;
        }

        snprintf(sock_name, sizeof(sock_name), "ipc:///tmp/ipc_events_from_%d.sock", i);

        zmq_connect(context->event_socket, sock_name);

        context->servers[i].req_sock = zmq_socket(context->zmq_ctx, ZMQ_DEALER);

        snprintf(sock_name, sizeof(sock_name),
            "ipc:///tmp/ipc_requests_to_%d_from_%d.sock", i, context->ipc_my_id);

        zmq_setsockopt(context->servers[i].req_sock, ZMQ_LINGER, &zmq_linger_value,
            sizeof(zmq_linger_value));

        zmq_setsockopt(context->servers[i].req_sock, ZMQ_IMMEDIATE, &immediate,
            sizeof(immediate));

        zmq_setsockopt(context->servers[i].req_sock, ZMQ_SNDHWM,
            &max_msg_in_request_queue_for_send,
            sizeof(max_msg_in_request_queue_for_send));

        zmq_setsockopt(context->servers[i].req_sock, ZMQ_RCVHWM,
            &max_msg_in_request_queue_for_recv,
            sizeof(max_msg_in_request_queue_for_recv));

        zmq_connect(context->servers[i].req_sock, sock_name);

        context->clients[i].req_sock = zmq_socket(context->zmq_ctx, ZMQ_DEALER);

        snprintf(sock_name, sizeof(sock_name),
            "ipc:///tmp/ipc_requests_to_%d_from_%d.sock", context->ipc_my_id, i);

        zmq_setsockopt(context->clients[i].req_sock, ZMQ_LINGER, &zmq_linger_value,
            sizeof(zmq_linger_value));

        zmq_setsockopt(context->clients[i].req_sock, ZMQ_IMMEDIATE, &immediate,
            sizeof(immediate));

        zmq_setsockopt(context->clients[i].req_sock, ZMQ_SNDHWM,
            &max_msg_in_request_queue_for_send,
            sizeof(max_msg_in_request_queue_for_send));

        zmq_setsockopt(context->clients[i].req_sock, ZMQ_RCVHWM,
            &max_msg_in_request_queue_for_recv,
            sizeof(max_msg_in_request_queue_for_recv));

        zmq_bind(context->clients[i].req_sock, sock_name);
    }
}

void delete_zmq(struct Ipc *const context)
{
    if (!context)
    {
        return;
    }

    if (context->pub_event_socket)
    {
        zmq_close(context->pub_event_socket);

        context->pub_event_socket = NULL;
    }

    if (context->event_socket)
    {
        zmq_close(context->event_socket);

        context->event_socket = NULL;
    }

    for (int i = 0; i < CLIENTS_COUNT; i++)
    {
        if (i == context->ipc_my_id)
        {
            continue;
        }

        if (context->servers[i].req_sock)
        {
            zmq_close(context->servers[i].req_sock);

            context->servers[i].req_sock = NULL;
        }

        if (context->clients[i].req_sock)
        {
            zmq_close(context->clients[i].req_sock);

            context->clients[i].req_sock = NULL;
        }
    }

    if (context->zmq_ctx)
    {
        zmq_ctx_destroy(context->zmq_ctx);

        context->zmq_ctx = NULL;
    }
}

int main(void)
{
    pid_t pid = fork();
    if (pid > 0)
    {
        int status = 0;
        struct Ipc context_first = {0};

        context_first.ipc_my_id = 0;

        create_zmq(&context_first);

        tune_zmq(&context_first);

        wait(&status);
#if 0
        delete_zmq(&context_first);
#endif
    }
    else if (0 == pid)
    {
        for (int i = 0; i < 10; i++)
        {
            struct Ipc context_second = {0};

            context_second.ipc_my_id = 1;

            create_zmq(&context_second);

            tune_zmq(&context_second);

            sleep(1);

            delete_zmq(&context_second);
        }
    }
    else
    {
        fprintf(stderr, "fork creation failed!\n");
    }
}

What's the actual result? (include assertion message & call stack if applicable)

If we start only one process, which only creates the context and sockets, but doesn't clean anything after itself, then we get the following leak:

HEAP SUMMARY:
    in use at exit: 41,664 bytes in 72 blocks
  total heap usage: 742 allocs, 670 frees, 186,517 bytes allocated

LEAK SUMMARY:
   definitely lost: 0 bytes in 0 blocks
   indirectly lost: 0 bytes in 0 blocks
     possibly lost: 640 bytes in 2 blocks
   still reachable: 41,024 bytes in 70 blocks
        suppressed: 0 bytes in 0 blocks

If I add the second process that will create and delete resources, then the memory consumption of the first process will increase:

HEAP SUMMARY:
    in use at exit: 1,750,904 bytes in 492 blocks
  total heap usage: 1,964 allocs, 1,472 frees, 3,160,817 bytes allocated

LEAK SUMMARY:
   definitely lost: 0 bytes in 0 blocks
   indirectly lost: 0 bytes in 0 blocks
     possibly lost: 640 bytes in 2 blocks
   still reachable: 1,750,264 bytes in 490 blocks
                      of which reachable via heuristic:
                        multipleinheritance: 21,120 bytes in 16 blocks
        suppressed: 0 bytes in 0 blocks

Accordingly, there is a dependency - the more creation and deletion of resources occur, the more memory consumption of the process increases.

What's the expected result?

Everything leads to the fact that the growth of memory is endless, which will lead to the fact that memory will eventually end. Can you tell me what the problem is?

@bluca
Copy link
Member

bluca commented Jun 15, 2021

I'm not sure I am following, you are creating resources without clearing them on termination, so obviously valgrind reports them as leaks - what is the issue here, precisely?

@Detect1ve
Copy link
Author

Detect1ve commented Jun 15, 2021

In the first case, the leak was less than in the case when resources are created and cleaned in another process.
This is a deliberate decision to show that memory is growing.

@bluca
Copy link
Member

bluca commented Jun 15, 2021

I'm still not following, sorry - you are allocating more resources, so you get more memory. Again, what is the issue?

@Detect1ve
Copy link
Author

Memory allocation and deallocation occurs in the second process, and memory grows in the first.

@bluca
Copy link
Member

bluca commented Jun 15, 2021

Yes, because of the IPC connections you are making

@Detect1ve
Copy link
Author

But why doesn't it free resources when the connection was dropped?
If you emulate the work of the server, then it will simply run out of memory after a certain number of connections.

@bluca
Copy link
Member

bluca commented Jun 15, 2021

Because you are keeping it open - if you want to drop it, close the sockets/etc. Again, I'm not sure what the problem is.

@Detect1ve
Copy link
Author

If I close the sockets, the server will stop working.
Or do I need to reopen sockets when memory runs out?
It seems that this option is not suitable, since there may be several clients.

@bluca
Copy link
Member

bluca commented Jun 15, 2021

Yes, if you want to keep it running, it requires memory. I mean, it can't really run out of thin air, it needs cpu and memory resources. Your machines need to have enough resources to handle the amount of traffic you expect. There is no issue nor memory leak here.

@bluca bluca closed this as completed Jun 15, 2021
@Detect1ve
Copy link
Author

@bluca

Okay, my mistake.
My deepest apologies.
This is not a memory leak, because after all resources are destroyed, all memory is freed. It looks like an endless growth of memory.
I checked other issues and found your comment #3470 (comment)
But it didn't help in my case.
Please explain why you blame Linux and the allocator for this memory growth. I am trying to use tcmalloc instead of the standard accocator, but the problem is not resolved.
In the case of a standard allocator, I see the following result: my program allocates a large chunk of memory, but after a while, the amount of memory is still growing.
In the case of tcmalloc, I see less memory usage, but I can see immediately that the amount of memory is growing.

valgrind showed me this result

328,000 bytes in 20 blocks are still reachable in loss record 82 of 82
   at 0x4C31DFB: malloc (vg_replace_malloc.c:309)
   by 0x40A366D: allocate_chunk (yqueue.hpp:189)
   by 0x40A366D: yqueue_t (yqueue.hpp:68)
   by 0x40A366D: zmq::pipepair(zmq::object_t**, zmq::pipe_t**, int*, bool*) (ypipe.hpp:51)
   by 0x40AD5FE: zmq::session_base_t::process_attach(zmq::i_engine*) (session_base.cpp:398)
   by 0x409E429: zmq::object_t::process_command(zmq::command_t&) (object.cpp:97)
   by 0x4093A13: zmq::io_thread_t::in_event() (io_thread.cpp:88)
   by 0x409217B: zmq::epoll_t::loop() [clone .part.11] (epoll.cpp:206)
   by 0x40C350C: thread_routine (thread.cpp:182)
   by 0x504E6DA: start_thread (pthread_create.c:463)
   by 0x5EAD71E: clone (clone.S:95)

If I increase the number of connections, I see a lot of messages like this in valgrind.
Is there anything I can do about this memory increase?

I can also assume that ZMQ_ZERO_COPY_RECV didn't help me because in my case I only have a connection without sending messages.

I would be very glad if you could help me.

@Detect1ve Detect1ve changed the title Memory leak Infinite memory consumption when infinite client reconnection Jun 22, 2021
@Detect1ve
Copy link
Author

@bluca
Ping.

@joakim-brannstrom
Copy link

@Detect1ve you may have to tell the allocator to actually shrink/free memory back to the OS. I use malloc_trim(0) when I need to do so.

@Detect1ve
Copy link
Author

@joakim-brannstrom
Thank you for your attention!
Unfortunately it didn't help. In my case, malloc_trim(0) always returns 0. You can use the code I attached to this issue to check, as I might be wrong somewhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants