Skip to content

Commit

Permalink
ptp
Browse files Browse the repository at this point in the history
Signed-off-by: forgive_dengkai <forgive_dengkai@163.com>
  • Loading branch information
forgivedengkai committed Oct 18, 2023
1 parent cfdc757 commit 303dd29
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class Bootstrap {
public static void main(String[] args) {
try {
injector = Guice.createInjector(new BrokerModule() );
System.err.println(injector.getAllBindings());
// System.err.println(injector.getAllBindings());
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("osx", args, buildCommandlineOptions(options),
new PosixParser());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class PutBatchSinkUtil {

CacheBuilder.newBuilder()
.maximumSize(2000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.expireAfterWrite(10, TimeUnit.SECONDS)
.concurrencyLevel(100)
.recordStats()
.softValues()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.fedai.osx.broker.grpc;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.fedai.osx.broker.provider.TechProviderRegister;
import org.fedai.osx.broker.util.ContextUtil;
import org.fedai.osx.core.constant.UriConstants;
import org.fedai.osx.core.context.OsxContext;
import org.fedai.osx.core.provider.TechProvider;
import org.ppc.ptp.PrivateTransferProtocolGrpc;
import org.ppc.ptp.PrivateTransferTransportGrpc;
@Singleton
@Slf4j
public class PcpInnerService extends PrivateTransferTransportGrpc.PrivateTransferTransportImplBase {

@Inject
TechProviderRegister techProviderRegister;

public void peek(org.ppc.ptp.Osx.PeekInbound request,
io.grpc.stub.StreamObserver<org.ppc.ptp.Osx.TransportOutbound> responseObserver) {
OsxContext osxContext = new OsxContext();
osxContext.setUri(UriConstants.PEEK);
ContextUtil.assableContextFromInbound(osxContext);
TechProvider techProvider= techProviderRegister.getTechProvider(osxContext);
techProvider.processGrpcPeek(osxContext,request, responseObserver);
}

public void pop(org.ppc.ptp.Osx.PopInbound request,
io.grpc.stub.StreamObserver<org.ppc.ptp.Osx.TransportOutbound> responseObserver) {
OsxContext osxContext = new OsxContext();
osxContext.setUri(UriConstants.POP);
ContextUtil.assableContextFromInbound(osxContext);
TechProvider techProvider=techProviderRegister.getTechProvider(osxContext);
techProvider.processGrpcPop(osxContext,request, responseObserver);
}

public void push(org.ppc.ptp.Osx.PushInbound request,
io.grpc.stub.StreamObserver<org.ppc.ptp.Osx.TransportOutbound> responseObserver) {
OsxContext osxContext = new OsxContext();
osxContext.setUri(UriConstants.PUSH);
ContextUtil.assableContextFromInbound(osxContext);
TechProvider techProvider= techProviderRegister.getTechProvider(osxContext);
techProvider.processGrpcPush(osxContext,request, responseObserver);
}

public void release(org.ppc.ptp.Osx.ReleaseInbound request,
io.grpc.stub.StreamObserver<org.ppc.ptp.Osx.TransportOutbound> responseObserver) {
OsxContext osxContext = new OsxContext();
osxContext.setUri(UriConstants.RELEASE);
ContextUtil.assableContextFromInbound(osxContext);
TechProvider techProvider= techProviderRegister.getTechProvider(osxContext);
techProvider.processGrpcRelease(osxContext,request, responseObserver);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

@Singleton
@Slf4j
public class PcpGrpcService extends PrivateTransferProtocolGrpc.PrivateTransferProtocolImplBase {
public class PcpInterService extends PrivateTransferProtocolGrpc.PrivateTransferProtocolImplBase {

@Inject
TechProviderRegister techProviderRegister;
Expand Down Expand Up @@ -141,23 +141,7 @@ public void onCompleted() {
}
}

public void peek(org.ppc.ptp.Osx.PeekInbound request,
io.grpc.stub.StreamObserver<org.ppc.ptp.Osx.TransportOutbound> responseObserver) {
OsxContext osxContext = new OsxContext();
osxContext.setUri(UriConstants.PEEK);
TechProvider techProvider= prepare(osxContext);
techProvider.processGrpcPeek(osxContext,request, responseObserver);
}

/**
*/
public void pop(org.ppc.ptp.Osx.PopInbound request,
io.grpc.stub.StreamObserver<org.ppc.ptp.Osx.TransportOutbound> responseObserver) {
OsxContext osxContext = new OsxContext();
osxContext.setUri(UriConstants.POP);
TechProvider techProvider= prepare(osxContext);
techProvider.processGrpcPop(osxContext,request, responseObserver);
}

private TechProvider prepare(OsxContext osxContext){
ContextUtil.assableContextFromInbound(osxContext);
Expand All @@ -166,23 +150,5 @@ private TechProvider prepare(OsxContext osxContext){
}


/**
*/
public void push(org.ppc.ptp.Osx.PushInbound request,
io.grpc.stub.StreamObserver<org.ppc.ptp.Osx.TransportOutbound> responseObserver) {
OsxContext osxContext = new OsxContext();

TechProvider techProvider= prepare(osxContext);
techProvider.processGrpcPush(osxContext,request, responseObserver);
}

/**
*/
public void release(org.ppc.ptp.Osx.ReleaseInbound request,
io.grpc.stub.StreamObserver<org.ppc.ptp.Osx.TransportOutbound> responseObserver) {
OsxContext osxContext = new OsxContext();
TechProvider techProvider= prepare(osxContext);
techProvider.processGrpcRelease(osxContext,request, responseObserver);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.fedai.osx.core.context.Protocol;
import org.fedai.osx.core.service.InboundPackage;
import org.fedai.osx.core.service.OutboundPackage;
import org.fedai.osx.core.utils.FlowLogUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -59,8 +60,9 @@ public io.grpc.stub.StreamObserver<com.webank.ai.eggroll.api.networking.proxy.Pr

public void unaryCall(com.webank.ai.eggroll.api.networking.proxy.Proxy.Packet request,
io.grpc.stub.StreamObserver<com.webank.ai.eggroll.api.networking.proxy.Proxy.Packet> responseObserver) {
OsxContext context = ContextUtil.buildFateContext(Protocol.grpc);
try {
OsxContext context = ContextUtil.buildFateContext(Protocol.grpc);

InboundPackage<Proxy.Packet> data = new InboundPackage<>();
data.setBody(request);
context.setDataSize(request.getSerializedSize());
Expand All @@ -70,6 +72,9 @@ public void unaryCall(com.webank.ai.eggroll.api.networking.proxy.Proxy.Packet re
}catch (Exception e){
responseObserver.onError(e);
}
finally {
FlowLogUtil.printFlowLog(context);
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ final public TechProvider select(String techProviderCode ) {
}
}

public TechProvider getTechProvider(OsxContext context){
TechProvider techProvider = this.select(context.getTechProviderCode());
if (techProvider == null) {
techProvider = this.select("default");
}
return techProvider;
}





public void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.fedai.osx.core.service.AbstractServiceAdaptorNew;
import org.ppc.ptp.Osx;
import org.ppc.ptp.PrivateTransferProtocolGrpc;
import org.ppc.ptp.PrivateTransferTransportGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
Expand Down Expand Up @@ -139,7 +140,7 @@ protected ConsumerResponse transformExceptionInfo(OsxContext context, ExceptionI
private Osx.TransportOutbound redirect(OsxContext context, RouterInfo routerInfo, Osx.PopInbound inbound) {
ManagedChannel managedChannel = GrpcConnectionFactory.createManagedChannel(routerInfo,true);
context.setActionType(ActionType.REDIRECT_CONSUME.name());
PrivateTransferProtocolGrpc.PrivateTransferProtocolBlockingStub stub = PrivateTransferProtocolGrpc.newBlockingStub(managedChannel);
PrivateTransferTransportGrpc.PrivateTransferTransportBlockingStub stub = PrivateTransferTransportGrpc.newBlockingStub(managedChannel);
return stub.pop(inbound);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
Expand All @@ -31,7 +32,9 @@
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.fedai.osx.broker.grpc.PcpGrpcService;

import org.fedai.osx.broker.grpc.PcpInnerService;
import org.fedai.osx.broker.grpc.PcpInterService;
import org.fedai.osx.broker.grpc.ProxyGrpcService;
import org.fedai.osx.broker.grpc.ServiceExceptionHandler;
import org.fedai.osx.broker.http.DispatchServlet;
Expand Down Expand Up @@ -62,18 +65,18 @@
* http1.X + grpc
*/
@Singleton
@Slf4j
public class OsxServer {

Logger logger = LoggerFactory.getLogger(OsxServer.class);
io.grpc.Server server;
io.grpc.Server tlsServer;
org.eclipse.jetty.server.Server httpServer;
org.eclipse.jetty.server.Server httpsServer;
@Inject
ProxyGrpcService proxyGrpcService;
@Inject
PcpGrpcService pcpGrpcService;

PcpInterService pcpInterService;
@Inject
PcpInnerService pcpInnerService;
@Inject
DispatchServlet dispatchServlet;

Expand All @@ -83,13 +86,13 @@ private synchronized void init() {
// pcpGrpcService = new PcpGrpcService();
server = buildServer();
if (MetaInfo.PROPERTY_OPEN_HTTP_SERVER) {
logger.info("prepare to create http server");
log.info("prepare to create http server");
httpServer = buildHttpServer();
if (httpServer == null) {
System.exit(0);
}
if (MetaInfo.PROPERTY_HTTP_USE_TLS) {
logger.info("prepare to create http server with TLS");
log.info("prepare to create http server with TLS");
httpsServer = buildHttpsServer();
if (httpsServer == null) {
System.exit(0);
Expand All @@ -98,7 +101,7 @@ private synchronized void init() {
}
tlsServer = buildTlsServer();
}catch(Exception e){
logger.error("server init error ",e);
log.error("server init error ",e);
e.printStackTrace();
}
}
Expand All @@ -118,7 +121,7 @@ public Server buildHttpServer() {
server.setHandler(buildServlet());
return server;
} catch (Exception e) {
logger.error("build http server error", e);
log.error("build http server error", e);
}
return null;
}
Expand Down Expand Up @@ -173,7 +176,7 @@ public Server buildHttpsServer() {
// }).start();
return server;
} catch (Exception e) {
logger.error("build https server error = {}", e.getMessage());
log.error("build https server error = {}", e.getMessage());
e.printStackTrace();
}
return null;
Expand All @@ -193,10 +196,10 @@ public boolean start() {
//grpc
try {
server.start();
logger.info("listen grpc port {} success", MetaInfo.PROPERTY_GRPC_PORT);
log.info("listen grpc port {} success", MetaInfo.PROPERTY_GRPC_PORT);
} catch (Exception e) {
if (e instanceof IOException || e.getCause() instanceof java.net.BindException) {
logger.error("port {} already in use, please try to choose another one !!!!", MetaInfo.PROPERTY_GRPC_PORT);
log.error("port {} already in use, please try to choose another one !!!!", MetaInfo.PROPERTY_GRPC_PORT);
}
e.printStackTrace();
return false;
Expand All @@ -206,11 +209,11 @@ public boolean start() {
try {
if (httpServer != null) {
httpServer.start();
logger.info("listen http port {} success", MetaInfo.PROPERTY_HTTP_PORT);
log.info("listen http port {} success", MetaInfo.PROPERTY_HTTP_PORT);
}
} catch (Exception e) {
if (e instanceof java.net.BindException || e.getCause() instanceof java.net.BindException) {
logger.error("port {} already in use, please try to choose another one !!!!", MetaInfo.PROPERTY_HTTP_PORT);
log.error("port {} already in use, please try to choose another one !!!!", MetaInfo.PROPERTY_HTTP_PORT);
}
e.printStackTrace();
return false;
Expand All @@ -219,27 +222,27 @@ public boolean start() {
//tls
try {
if (tlsServer != null) {
logger.info("grpc tls server try to start, listen port {}", MetaInfo.PROPERTY_GRPC_TLS_PORT);
log.info("grpc tls server try to start, listen port {}", MetaInfo.PROPERTY_GRPC_TLS_PORT);
tlsServer.start();
logger.info("listen grpc tls port {} success", MetaInfo.PROPERTY_GRPC_TLS_PORT);
log.info("listen grpc tls port {} success", MetaInfo.PROPERTY_GRPC_TLS_PORT);
}
} catch (Exception e) {
if (e instanceof java.net.BindException || e.getCause() instanceof java.net.BindException) {
logger.error("port {} already in use, please try to choose another one !!!!", MetaInfo.PROPERTY_GRPC_TLS_PORT);
log.error("port {} already in use, please try to choose another one !!!!", MetaInfo.PROPERTY_GRPC_TLS_PORT);
}
e.printStackTrace();

return false;
}

//https
try {
if (httpsServer != null) {
httpsServer.start();
logger.info("listen https port {} success", MetaInfo.PROPERTY_HTTPS_PORT);
log.info("listen https port {} success", MetaInfo.PROPERTY_HTTPS_PORT);
}
} catch (Exception e) {
if (e instanceof java.net.BindException || e.getCause() instanceof java.net.BindException) {
logger.error("port {} already in use, please try to choose another one !!!!", MetaInfo.PROPERTY_HTTPS_PORT);
log.error("port {} already in use, please try to choose another one !!!!", MetaInfo.PROPERTY_HTTPS_PORT);
}
e.printStackTrace();
return false;
Expand All @@ -261,14 +264,12 @@ private io.grpc.Server buildTlsServer() {
.clientAuth(ClientAuth.REQUIRE)
.sessionTimeout(MetaInfo.PROPERTY_GRPC_SSL_SESSION_TIME_OUT)
.sessionCacheSize(MetaInfo.PROPERTY_HTTP_SSL_SESSION_CACHE_SIZE);
logger.info("running in secure mode. server crt path: {}, server key path: {}, ca crt path: {}.",
log.info("running in secure mode. server crt path: {}, server key path: {}, ca crt path: {}.",
certChainFilePath, privateKeyFilePath, trustCertCollectionFilePath);
//serverBuilder.executor(executor);
nettyServerBuilder.sslContext(GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL).build());
nettyServerBuilder.addService(ServerInterceptors.intercept(proxyGrpcService, new ServiceExceptionHandler(), new ContextPrepareInterceptor()));
nettyServerBuilder.addService(ServerInterceptors.intercept(pcpGrpcService, new ServiceExceptionHandler(), new ContextPrepareInterceptor()));


nettyServerBuilder.addService(ServerInterceptors.intercept(pcpInterService, new ServiceExceptionHandler(), new ContextPrepareInterceptor()));
nettyServerBuilder
.executor(Executors.newCachedThreadPool())
.maxConcurrentCallsPerConnection(MetaInfo.PROPERTY_GRPC_SERVER_MAX_CONCURRENT_CALL_PER_CONNECTION)
Expand Down Expand Up @@ -299,12 +300,12 @@ private io.grpc.Server buildTlsServer() {
return null;
}


private io.grpc.Server buildServer() {
SocketAddress address = new InetSocketAddress(MetaInfo.PROPERTY_BIND_HOST, MetaInfo.PROPERTY_GRPC_PORT);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address);
nettyServerBuilder.addService(ServerInterceptors.intercept(proxyGrpcService, new ServiceExceptionHandler(), new ContextPrepareInterceptor()));
nettyServerBuilder.addService(ServerInterceptors.intercept(pcpGrpcService, new ServiceExceptionHandler(), new ContextPrepareInterceptor()));
nettyServerBuilder.addService(ServerInterceptors.intercept(pcpInterService, new ServiceExceptionHandler(), new ContextPrepareInterceptor()));
nettyServerBuilder.addService(ServerInterceptors.intercept(pcpInnerService, new ServiceExceptionHandler(), new ContextPrepareInterceptor()));
nettyServerBuilder
.executor(Executors.newCachedThreadPool())
.maxConcurrentCallsPerConnection(MetaInfo.PROPERTY_GRPC_SERVER_MAX_CONCURRENT_CALL_PER_CONNECTION)
Expand Down
Loading

0 comments on commit 303dd29

Please sign in to comment.