Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segv for message that cannot be sent on kafka #225

Open
persona94 opened this issue Oct 17, 2023 · 1 comment
Open

Segv for message that cannot be sent on kafka #225

persona94 opened this issue Oct 17, 2023 · 1 comment

Comments

@persona94
Copy link

persona94 commented Oct 17, 2023

I have a server where my producer is not authorized to send a message on kafka. I use async send and the program crashes after the 1st send

My code to send

{
       std::unique_lock<std::mutex> lk(recordMapMutex);
       auto iter = recordMap.emplace(std::piecewise_construct,
           std::forward_as_tuple(data->messageId),
           std::forward_as_tuple(data->topic, kafka::Key{ data->key.c_str(), data->key.size() }, kafka::Value{ data->event.c_str(), data->event.size() }));
       rec = &iter.first->second;
       std::string prdIdStr = "producerId";
       rec->headers() = { { prdIdStr, kafka::Value(instanceId.c_str(), instanceId.size()) } };
}

Here, data is a class is passed in as a shared ptr. For now the shared pointer is pushed to a map and never removed, so the memory is valid even after the message is sent.

Here is the async send

    producer->send(
        *rec,
        [&, data, cb, rec](const RecordMetadata& metadata, const Error& error)
        {
            {
                // Disabled on purpose
                // std::unique_lock<std::mutex> lk(recordMapMutex);
                // recordMap.erase(data->messageId);
            }

            if (error)
            {
                std::cout < < "send hit error, calling CB" << std::endl;
                cb(data, metadata.toString(), error.toString());
            }
            else
            {
               std::cout << "No error" << std::endl;
                cb(data, metadata.toString(), "");
            }
        });

cb is a callback function that takes in a shared_ptr to the data that was passed in, and 2 strings

The error I get from kafka is

Error Broker: Topic authorization failed [29] sending message

Some logs from kafka

KafkaProducer[feae661e-a7d61a65] NOINFO | [thrd:main]: Topic test-topic metadata information unknown
KafkaProducer[feae661e-a7d61a65] NOINFO | [thrd:main]: Topic test-topic partition count is zero: should refresh metadata
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: Requesting metadata for 1/1 topics: refresh unavailable topics
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Request metadata for 1 topic(s): refresh unavailable topics
KafkaProducer[feae661e-a7d61a65] SEND | [thrd:ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Sent MetadataRequest (v4, 57 bytes @ 0, CorrId 3)
KafkaProducer[feae661e-a7d61a65] RECV | [thrd:ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Received MetadataResponse (v4, 284 bytes, CorrId 3, rtt 1.55ms)
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: ===== Received metadata (for 1 requested topics): refresh unavailable topics =====
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: ClusterId: MvZ7D3v5Qx2GezBHiz26Vg, ControllerId: 0
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: 3 brokers, 1 topics
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap:   Broker #0/3: kafka-kafka-0.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 0
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap:   Broker #1/3: kafka-kafka-2.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 2
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap:   Broker #2/3: kafka-kafka-1.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 1
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap:   Topic #0/1: test-topic with 0 partitions: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: Error in metadata reply for topic test-topic (PartCnt 0): Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] TOPICERROR | [thrd:main]: Topic test-topic has permanent error: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] STATE | [thrd:main]: Topic test-topic changed state unknown -> error
KafkaProducer[feae661e-a7d61a65] PARTCNT | [thrd:main]: Failing all 1 unassigned messages in topic test-topic due to permanent topic error: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] UAS | [thrd:main]: 0/1 messages were partitioned in topic test-topic
KafkaProducer[feae661e-a7d61a65] UAS | [thrd:main]: 1/1 messages failed partitioning in topic test-topic
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: 1/1 requested topic(s) seen in metadata

back trace from the core file

#0  0x0000000000f5a360 in rd_list_destroy ()
#1  0x0000000000f091b9 in rd_kafka_headers_destroy ()
#2  0x0000000000ea4fc4 in rd_kafka_produceva ()
#3  0x00000000008c0268 in kafka::clients::producer::KafkaProducer::send(kafka::clients::producer::ProducerRecord const&, std::function<void (kafka::clients::producer::RecordMetadata const&, kafka::Error const&)> const&, kafka::clients::producer::KafkaProducer::SendOption, kafka::clients::producer::KafkaProducer::ActionWhileQueueIsFull) (this=<optimized out>, record=..., deliveryCb=..., option=<optimized out>, action=<optimized out>) at /opt/rh/gcc-toolset-11/root/usr/include/c++/11/bits/stl_vector.h:918
@persona94
Copy link
Author

Hello, is anyone able to look into this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant