Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connecting to remote Kafka server #17

Closed
gataky opened this issue Apr 18, 2013 · 3 comments
Closed

Connecting to remote Kafka server #17

gataky opened this issue Apr 18, 2013 · 3 comments

Comments

@gataky
Copy link

gataky commented Apr 18, 2013

I've been playing with your module (awesome job by the way!) with everything running locally. I now have an chance to use this with a remote kafka server but running into some problems. Perhaps you can help me.

from kafka.client import KafkaClient
from kafka.client import SimpleProducer

client = KafkaClient('x.x.x.x', 9092)
producer = SimpleProducer(client, 'test')

Up to this point everything works fine, I don't see any tracebacks or any error messages on the kafka server; however, when I try and send a message producer.send_messages('this is a test') I get a traceback

----> 1 producer.send_messages('this is a test')

/usr/local/lib/python2.7/dist-packages/kafka/producer.pyc in send_messages(self, *msg)
     20         req = ProduceRequest(self.topic, self.next_partition.next(),
     21             messages=[create_message(m) for m in msg])
---> 22         resp = self.client.send_produce_request([req])[0]
     23         assert resp.error == 0

/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in send_produce_request(self, payloads, acks, timeout, fail_on_error, callback)
    164         resps = self._send_broker_aware_request(payloads,
    165                     partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout),
--> 166                     KafkaProtocol.decode_produce_response)
    167         out = []
    168         for resp in resps:

/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn)
    123         # For each broker, send the list of request payloads
    124         for broker, payloads in payloads_by_broker.items():
--> 125             conn = self._get_conn_for_broker(broker)
    126             requestId = self._next_id()
    127             request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads)

/usr/local/lib/python2.7/dist-packages/kafka/client.pyc in _get_conn_for_broker(self, broker)
     39         "Get or create a connection to a broker"
     40         if (broker.host, broker.port) not in self.conns:
---> 41             self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize)
     42         return self.conns[(broker.host, broker.port)]
     43 

/usr/local/lib/python2.7/dist-packages/kafka/conn.pyc in __init__(self, host, port, bufsize)
     19         self.bufsize = bufsize
     20         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
---> 21         self._sock.connect((host, port))
     22         self._sock.settimeout(10)
     23 

/usr/lib/python2.7/socket.pyc in meth(name, self, *args)
    222 
    223 def meth(name,self,*args):
--> 224     return getattr(self._sock,name)(*args)
    225 
    226 for _m in _socketmethods:

error: [Errno 111] Connection refused

I did a little digging with the client object client.brokers and saw {0: BrokerMetadata(nodeId=0, host='0.0.0.0', port=9092)}

host='0.0.0.0' I'm not too sure why it's using localhost

So I tried

from kafka.common import BrokerMetadata
newBroker = BrokerMetadata(0, 'x.x.x.x', 9092)
client.brokers[0] = newBroker
producer = SimpleProducer(client, 'test')
producer.send_messages('this is a test')

and got the same traceback as before

I'm still a little new to kafka, so it's probably something trivial but the current setup I have is zookeeper and a kafka server with one broker and kafka's console consumer running on a remote box1. I'm running your module from another box2. Do I need to have something else running box2? I can telnet into box1 with the host:port with no problems. I can also take the client and do a client.close() and watch kafka on box2 tell me Closing socket connection to x.x.x.x So KafkaClient is connecting to it.

Both boxes are running Ubuntu 12.04, python 2.7 and Kafka 0.8
box2 has kafka 0.8
box1 has the head of your repo

@gataky
Copy link
Author

gataky commented Apr 18, 2013

I put a break point in your conn.py right before line 21

self._sock.connect((host, port))

and changed the host to the ip it's supposed to point to and messages look like they're being sent now.

It looks like host is defaulting to 0.0.0.0 somewhere.

@mumrah
Copy link
Collaborator

mumrah commented Apr 18, 2013

Check the Kafka metadata in ZooKeeper. The method _send_broker_aware_request looks up which broker a request should be sent to, so if Kafka itself is registered as "localhost" or "0.0.0.0" in ZK, then that's what the client will think to use.

If you turn on debug logging level then you'll see when the client is making a metadata request to the broker.

@gataky
Copy link
Author

gataky commented Apr 18, 2013

Ah I see. Working now. Thanks so much for you help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants