Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-1 - Introduce Pulsar proxy component #548

Merged
merged 6 commits into from
Jul 11, 2017
Merged

Conversation

merlimat
Copy link
Contributor

@merlimat merlimat commented Jul 6, 2017

Motivation

https://github.com/apache/incubator-pulsar/wiki/PIP-1:-Pulsar-Proxy

As described in the wiki page, added a proxy component that exposes a stateless service that talk Pulsar binary protocol.

@merlimat merlimat added the type/feature The PR added a new feature or issue requested a new feature label Jul 6, 2017
@merlimat merlimat added this to the 1.19.0-incubating milestone Jul 6, 2017
@merlimat merlimat self-assigned this Jul 6, 2017
@@ -59,7 +62,7 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
* @param destination: topic-name
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<InetSocketAddress> getBroker(DestinationName destination) {
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(DestinationName destination) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think using InetSocketAddress[] would be better to avoid creation of Pair object here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it would be the same number of object allocations, either a Pair<> or the InetSocketAddress[]. No differences, except Pair conveys better the fact that 2 addresses are going to be there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I meant avoiding creation of Pair.

conveys better the fact that 2 addresses are going to be there.

Yes, make sense.

@@ -77,6 +78,8 @@
private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;

private String targetBrokerAddress = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think if we can add proxy in the name it will easy to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will rename that

@@ -188,6 +197,11 @@ message CommandLookupTopicResponse {
optional bool authoritative = 5 [default = false];
optional ServerError error = 6;
optional string message = 7;

// If it's true, indicates to the client that it must
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format?

// Original principal that was verified by
// a Pulsar proxy. In this case the auth info above
// will the the auth of the proxy itself
optional string original_principal = 7;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if proxy is going to do auth&auth for original-principal then broker will not do any validation on this original_principal right? if that's true then are we passing it for debugging only? and I don't see if we are setting it in Commands.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proxy is not doing authorization on the data connections. The reason is that after the initial connect/connected it just forward everything on both sides.

The broker still needs to check whether that particular role is authorized to pulblish on a specific topic. So the proxy relays the original client principal, and that will be used by the broker for the authorization, trusting that the proxy has already validated the authentication.

if (future.isSuccess()) {
outboundChannel.read();
} else {
inboundChannel.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log warn message here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

public void operationComplete(Future<Void> future) throws Exception {
// This is invoked when the write operation on the paired connection is completed
if (future.isSuccess()) {
outboundChannel.read();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we have already triggered read on outboundChannel on line-107, so channel-handler will continue reading even if we don't call here. right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the auto-read on the channel is set to false. The reason is to adapt to the speed of the slowest connection.

So, we read 1 buffer from c1, write into c2. When the write is completed (meaning we were able to write on the socket because the TCP send window was open), then we read the next buffer.

brokerURI = new URI(brokerServiceUrl);
} catch (URISyntaxException e) {
proxyConnection.ctx().writeAndFlush(
Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServerError.MetadataError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes more sense. Fixed

}

private void close() {
ctx.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have additional state= 'ConnectionClosed` ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@merlimat
Copy link
Contributor Author

@rdhabalia Addressed comments

Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM .. just minor comments.

#

# Zookeeper quorum connection string (comma-separated)
zookeeperServers=127.0.0.1:2181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in other conf file we have kept it empty. should we do the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I forgot these after testing

private final ZooKeeperCache localZkCache;
private final LocalZooKeeperConnectionService localZkConnectionSvc;

private final ZooKeeperDataCache<LoadReport> brokerInfo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData because ModularLoadManager generates LocalBrokerData.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

DefaultExports.initialize();
server.addServlet("/metrics", new ServletHolder(MetricsServlet.class));

// server.addServlet("/*", DiscoveryServiceServlet.class, initParameters);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this comment require for any indication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot this, will remove

this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls());

EventLoopGroup acceptorEventLoop, workersEventLoop;
if (SystemUtils.IS_OS_LINUX) {
Copy link
Contributor

@rdhabalia rdhabalia Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought SystemUtils.IS_OS_LINUX && io.netty.channel.epoll.Epoll.isAvailable()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about Epoll.isAvailable(), probably was introduced later in Netty. Then we can just use that check, no need to verify we are on Linux.

@merlimat
Copy link
Contributor Author

@rdhabalia Updated

Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat merlimat merged commit f36b70c into apache:master Jul 11, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants