-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-1 - Introduce Pulsar proxy component #548
Conversation
@@ -59,7 +62,7 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool | |||
* @param destination: topic-name | |||
* @return broker-socket-address that serves given topic | |||
*/ | |||
public CompletableFuture<InetSocketAddress> getBroker(DestinationName destination) { | |||
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(DestinationName destination) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think using InetSocketAddress[] would be better to avoid creation of Pair object here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it would be the same number of object allocations, either a Pair<>
or the InetSocketAddress[]
. No differences, except Pair
conveys better the fact that 2 addresses are going to be there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I meant avoiding creation of Pair
.
conveys better the fact that 2 addresses are going to be there.
Yes, make sense.
@@ -77,6 +78,8 @@ | |||
private final int maxNumberOfRejectedRequestPerConnection; | |||
private final int rejectedRequestResetTimeSec = 60; | |||
|
|||
private String targetBrokerAddress = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think if we can add proxy
in the name it will easy to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will rename that
@@ -188,6 +197,11 @@ message CommandLookupTopicResponse { | |||
optional bool authoritative = 5 [default = false]; | |||
optional ServerError error = 6; | |||
optional string message = 7; | |||
|
|||
// If it's true, indicates to the client that it must |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format?
// Original principal that was verified by | ||
// a Pulsar proxy. In this case the auth info above | ||
// will the the auth of the proxy itself | ||
optional string original_principal = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if proxy is going to do auth&auth for original-principal then broker will not do any validation on this original_principal
right? if that's true then are we passing it for debugging only? and I don't see if we are setting it in Commands.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proxy is not doing authorization on the data connections. The reason is that after the initial connect/connected it just forward everything on both sides.
The broker still needs to check whether that particular role is authorized to pulblish on a specific topic. So the proxy relays the original client principal, and that will be used by the broker for the authorization, trusting that the proxy has already validated the authentication.
if (future.isSuccess()) { | ||
outboundChannel.read(); | ||
} else { | ||
inboundChannel.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we log warn message here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
public void operationComplete(Future<Void> future) throws Exception { | ||
// This is invoked when the write operation on the paired connection is completed | ||
if (future.isSuccess()) { | ||
outboundChannel.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we have already triggered read
on outboundChannel
on line-107, so channel-handler will continue reading even if we don't call here. right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the auto-read on the channel is set to false. The reason is to adapt to the speed of the slowest connection.
So, we read 1 buffer from c1
, write into c2
. When the write is completed (meaning we were able to write on the socket because the TCP send window was open), then we read the next buffer.
brokerURI = new URI(brokerServiceUrl); | ||
} catch (URISyntaxException e) { | ||
proxyConnection.ctx().writeAndFlush( | ||
Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ServerError.MetadataError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes more sense. Fixed
} | ||
|
||
private void close() { | ||
ctx.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have additional state= 'ConnectionClosed` ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@rdhabalia Addressed comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM .. just minor comments.
# | ||
|
||
# Zookeeper quorum connection string (comma-separated) | ||
zookeeperServers=127.0.0.1:2181 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in other conf file we have kept it empty. should we do the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I forgot these after testing
private final ZooKeeperCache localZkCache; | ||
private final LocalZooKeeperConnectionService localZkConnectionSvc; | ||
|
||
private final ZooKeeperDataCache<LoadReport> brokerInfo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData
because ModularLoadManager
generates LocalBrokerData
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
DefaultExports.initialize(); | ||
server.addServlet("/metrics", new ServletHolder(MetricsServlet.class)); | ||
|
||
// server.addServlet("/*", DiscoveryServiceServlet.class, initParameters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this comment require for any indication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forgot this, will remove
this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls()); | ||
|
||
EventLoopGroup acceptorEventLoop, workersEventLoop; | ||
if (SystemUtils.IS_OS_LINUX) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a thought SystemUtils.IS_OS_LINUX && io.netty.channel.epoll.Epoll.isAvailable()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know about Epoll.isAvailable()
, probably was introduced later in Netty. Then we can just use that check, no need to verify we are on Linux.
@rdhabalia Updated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivation
https://github.com/apache/incubator-pulsar/wiki/PIP-1:-Pulsar-Proxy
As described in the wiki page, added a proxy component that exposes a stateless service that talk Pulsar binary protocol.