Skip to content

Commit

Permalink
format code
Browse files Browse the repository at this point in the history
Signed-off-by: forgive_dengkai <forgive_dengkai@163.com>
  • Loading branch information
forgivedengkai committed Nov 20, 2023
1 parent eae6edb commit d820245
Show file tree
Hide file tree
Showing 195 changed files with 2,461 additions and 3,417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

package org.fedai.osx.api.router;

import lombok.Data;


Expand All @@ -40,19 +41,19 @@ public class RouterInfo {

public String toKey() {
StringBuffer sb = new StringBuffer();
if(Protocol.grpc.equals(protocol)) {
if (Protocol.grpc.equals(protocol)) {
sb.append(host).append("_").append(port);
if (negotiationType != null)
sb.append("_").append(negotiationType);
}else {
} else {
sb.append(url);
}
return sb.toString();
}

@Override
public String toString() {
return toKey();
return toKey();
}

public String getResource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
* limitations under the License.
*/
package org.fedai.osx.api.tech.provider;

import io.grpc.stub.StreamObserver;
import org.ppc.ptp.Osx;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public interface TechProvider {
//用于处理http1.X请求
void processHttpInvoke(HttpServletRequest httpServletRequest,HttpServletResponse httpServletResponse);
void processHttpInvoke(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse);

//用于处理grpc非流式请求
void processGrpcInvoke(Osx.Inbound request,
io.grpc.stub.StreamObserver<Osx.Outbound> responseObserver);

//用于处理grpc流式请求
public StreamObserver<Osx.Inbound> processGrpcTransport(Osx.Inbound inbound, StreamObserver<Osx.Outbound> responseObserver);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@
public class Bootstrap {
static Logger logger = LoggerFactory.getLogger(Bootstrap.class);
static CommandLine commandLine;
static Object lockObject= new Object();
static Object lockObject = new Object();
static Injector injector;

public static void main(String[] args) {
try {
injector = Guice.createInjector(new BrokerModule() );
injector = Guice.createInjector(new BrokerModule());
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("osx", args, buildCommandlineOptions(options),
new PosixParser());
Expand All @@ -61,18 +62,18 @@ public static void main(String[] args) {
packages.add(Bootstrap.class.getPackage().getName());
ApplicationStartedRunnerUtils.run(injector, packages, args);

boolean startOk = injector.getInstance(OsxServer.class).start();
if(!startOk){
boolean startOk = injector.getInstance(OsxServer.class).start();
if (!startOk) {
System.exit(-1);
}
Thread shutDownThread = new Thread(bootstrap::stop);
Runtime.getRuntime().addShutdownHook(shutDownThread);
synchronized (lockObject){
synchronized (lockObject) {
lockObject.wait();
}

} catch (Exception ex) {
logger.error("broker start failed ",ex);
logger.error("broker start failed ", ex);
ex.printStackTrace();
System.exit(1);
}
Expand All @@ -88,7 +89,7 @@ private static Options buildCommandlineOptions(final Options options) {
public static void parseConfig(String configDir) {
try {
MetaInfo.PROPERTY_CONFIG_DIR = configDir;
String configFilePath = configDir+ "/broker/broker.properties";
String configFilePath = configDir + "/broker/broker.properties";
Properties environment = PropertiesUtil.getProperties(configFilePath);
MetaInfo.init(environment);
} catch (Exception e) {
Expand All @@ -98,16 +99,15 @@ public static void parseConfig(String configDir) {
}

public void start(String[] args) {
// ServiceContainer.init();
JvmInfoCounter.start();
}

public void stop() {
logger.info("try to shutdown server ...");
if (injector != null) {
TransferQueueManager transferQueueManager = injector.getInstance(TransferQueueManager.class);
if(transferQueueManager!=null)
transferQueueManager.destroyAll();
TransferQueueManager transferQueueManager = injector.getInstance(TransferQueueManager.class);
if (transferQueueManager != null)
transferQueueManager.destroyAll();
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

@FunctionalInterface
public interface MsgEventCallback {
void callback(ConsumerManager consumerManager,TransferQueue transferQueue , MessageExt message) throws Exception;
void callback(ConsumerManager consumerManager, TransferQueue transferQueue, MessageExt message) throws Exception;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

public enum MessageFlag {

SENDMSG(0), ERROR(1), COMPELETED(2),BACKMSG(3);
SENDMSG(0), ERROR(1), COMPELETED(2), BACKMSG(3);

private int flag;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

@Singleton
public class ConsumerManager implements Lifecycle {
public class ConsumerManager implements Lifecycle {
Logger logger = LoggerFactory.getLogger(ConsumerManager.class);
@Inject
TransferQueueManager transferQueueManager;
Expand Down Expand Up @@ -63,6 +64,7 @@ public void run() {
public String getServiceName() {
return "longPullingThread";
}

@Override
public void run() {
int interval = 200;
Expand Down Expand Up @@ -144,11 +146,11 @@ public UnaryConsumer getUnaryConsumer(String transferId) {
// }
// }

public UnaryConsumer getOrCreateUnaryConsumer(String sessionId,String topic) {
String indexKey = TransferQueueManager.assembleTopic(sessionId,topic);
public UnaryConsumer getOrCreateUnaryConsumer(String sessionId, String topic) {
String indexKey = TransferQueueManager.assembleTopic(sessionId, topic);
if (unaryConsumerMap.get(indexKey) == null) {
UnaryConsumer unaryConsumer =
new UnaryConsumer(transferQueueManager,consumerManager,consumerIdIndex.get(),sessionId, topic);
new UnaryConsumer(transferQueueManager, consumerManager, consumerIdIndex.get(), sessionId, topic);
unaryConsumerMap.putIfAbsent(indexKey, unaryConsumer);
return unaryConsumerMap.get(indexKey);
} else {
Expand All @@ -158,11 +160,11 @@ public UnaryConsumer getOrCreateUnaryConsumer(String sessionId,String topic) {

public void onComplete(String indexKey) {

if(this.unaryConsumerMap.containsKey(indexKey)) {
if (this.unaryConsumerMap.containsKey(indexKey)) {
this.unaryConsumerMap.get(indexKey).destroy();
this.unaryConsumerMap.remove(indexKey);
}else{
// logger.error("cannot found {} in consumer map ",indexKey);
} else {
// logger.error("cannot found {} in consumer map ",indexKey);
}
// if(this.eventDrivenConsumerMap.contains(indexKey)){
// this.eventDrivenConsumerMap.get(indexKey).destroy();
Expand All @@ -176,8 +178,6 @@ public void onComplete(String indexKey) {
public void init() {




}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.fedai.osx.broker.consumer;

import org.fedai.osx.broker.queue.AbstractQueue;
import org.fedai.osx.broker.queue.TransferQueue;

@FunctionalInterface
public interface EventDriverRule {
Expand Down
Loading

0 comments on commit d820245

Please sign in to comment.