Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
Signed-off-by: forgive_dengkai <forgive_dengkai@163.com>
  • Loading branch information
forgivedengkai committed Jul 28, 2023
1 parent 6282caa commit 784a200
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public void addLongPullingQueue(LongPullingHold longPullingHold) {
}

public synchronized int answerLongPulling() {


/*
* 这里需要改为ack 后才加1 ,要不然这里会丢消息
*/
Expand All @@ -78,9 +76,13 @@ public synchronized int answerLongPulling() {
while (this.longPullingQueue.size() > 0) {
LongPullingHold longPullingHold = this.longPullingQueue.poll();
try {

// long indexFileOffset = transferQueue.getIndexQueue().getLogicOffset().get();

io.grpc.Context grpcContext = longPullingHold.getGrpcContext();
if(grpcContext!=null){
if(grpcContext.isCancelled()){
logger.error("topic {} consumer grpc context is cancelled",transferId);
continue;
}
}
long current= System.currentTimeMillis();
long needOffset = longPullingHold.getNeedOffset();
if(transferQueue==null){
Expand Down Expand Up @@ -135,19 +137,7 @@ public synchronized int answerLongPulling() {
}
} catch (Exception e) {
logger.error("topic {} answer long pulling error ",transferId,e);

longPullingHold.throwException(e);
// try {
// Thread.sleep(1000git ggi






// } catch (InterruptedException interruptedException) {
// logger.error("interruptedException : ",interruptedException);
// }
}
}
if (reputList != null) {
Expand All @@ -165,6 +155,7 @@ private void handleExpire(LongPullingHold longPullingHold){
public static class LongPullingHold {
Logger logger = LoggerFactory.getLogger(LongPullingHold.class);
FateContext context;
io.grpc.Context grpcContext;
StreamObserver streamObserver;
HttpServletResponse httpServletResponse;
long expireTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
String[] remoteAddrSplited = remoteAddr.split(":");
String remoteIp = remoteAddrSplited[0].replaceAll("\\/", "");
Context context = Context.current().withValue(sourceIp, remoteIp);

return Contexts.interceptCall(context, call, headers, next);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.osx.api.router.RouterInfo;
import com.osx.broker.ServiceContainer;
import com.osx.broker.consumer.UnaryConsumer;
import com.osx.broker.grpc.ContextPrepareInterceptor;
import com.osx.broker.queue.CreateQueueResult;
import com.osx.broker.queue.TransferQueue;
import com.osx.broker.queue.TransferQueueApplyInfo;
Expand Down Expand Up @@ -110,6 +111,8 @@ protected Osx.Outbound doService(FateContext context, InboundPackage<Osx.Inbound
if (offset < 0) {

UnaryConsumer.LongPullingHold longPullingHold = new UnaryConsumer.LongPullingHold();

longPullingHold.setGrpcContext(io.grpc.Context.current());
longPullingHold.setNeedOffset(offset);
longPullingHold.setStreamObserver(streamObserver);
longPullingHold.setContext(context.subContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,24 @@
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class QueueTest {
Logger logger = LoggerFactory.getLogger(QueueTest.class);
String ip = "localhost";
static String ip = "localhost";
//int port = 8250;//nginx
int port = 9370;//nginx
String desPartyId = "9999";
String desRole = "";
String srcPartyId = "10000";
String srcRole = "";
String transferId = "testTransferId";
String sessionId = "testSessionId";
static int port = 9370;//nginx
static String desPartyId = "9999";
static String desRole = "";
static String srcPartyId = "10000";
static String srcRole = "";
static String transferId = "testTransferId";
static String sessionId = "testSessionId";
static FateContext fateContext= new FateContext();
static RouterInfo routerInfo= new RouterInfo();
static {
routerInfo.setHost(ip);
routerInfo.setPort(port);
}




//4359615

Expand Down Expand Up @@ -97,10 +106,7 @@ public void test02Query() {
inboundBuilder.putMetadata(Osx.Metadata.TargetComponentName.name(), "");
inboundBuilder.putMetadata(Osx.Metadata.SourceComponentName.name(), "");
inboundBuilder.putMetadata(Osx.Metadata.MessageTopic.name(), transferId);
FateContext fateContext= new FateContext();
RouterInfo routerInfo= new RouterInfo();
routerInfo.setHost("localhost");
routerInfo.setPort(9370);

Osx.Outbound outbound =TransferUtil.redirect(fateContext,inboundBuilder.build(),routerInfo,false);
// Osx.Outbound outbound = blockingStub.invoke(inboundBuilder.build());
Osx.TopicInfo topicInfo = null;
Expand Down Expand Up @@ -156,15 +162,9 @@ public void test04UnaryProduce() {
//inboundBuilder.getMetadataMap().put(Pcp.Metadata.MessageOffSet.name(),);
Osx.Message.Builder messageBuilder = Osx.Message.newBuilder();
//4359615
messageBuilder.setBody(ByteString.copyFrom(createBigArray(10359615)));
//messageBuilder.setHead(ByteString.copyFrom(("test head " + i).getBytes()));
//messageBuilder.setBody(ByteString.copyFrom(createBigArray(10359615)));
messageBuilder.setHead(ByteString.copyFrom(("test head " + i).getBytes()));
inboundBuilder.setPayload(messageBuilder.build().toByteString());

FateContext fateContext= new FateContext();
RouterInfo routerInfo= new RouterInfo();
routerInfo.setHost("localhost");
routerInfo.setPort(9370);
// System.err.println(routerInfo);
Osx.Outbound outbound =TransferUtil.redirect(fateContext,inboundBuilder.build(),routerInfo,false);


Expand Down Expand Up @@ -196,23 +196,7 @@ public void testTopicApply(){
inboundBuilder.putMetadata(Osx.Metadata.MessageTopic.name(), "testTopic0001");
inboundBuilder.putMetadata(Osx.Metadata.InstanceId.name(),"localhost:9999" );
inboundBuilder.putMetadata(Osx.Header.SessionID.name(), "testSessionId");


// TopicApplyRequest topicApplyRequest = new TopicApplyRequest();
// topicApplyRequest.setTopic("testTopic0001");
// topicApplyRequest.setInstanceId("localhost:9999");
// topicApplyRequest.setSessionId("testSessionId");
// inboundBuilder.setPayload(ByteString.copyFrom(JsonUtil.object2Json(topicApplyRequest).getBytes(StandardCharsets.UTF_8)));
// RouterInfo routerInfo = new RouterInfo();
// routerInfo.setHost("localhost");
// routerInfo.setPort(9370);
// ManagedChannel managedChannel = GrpcConnectionFactory.createManagedChannel(routerInfo,true);
// FireworkServiceGrpc.FireworkServiceBlockingStub stub = FireworkServiceGrpc.newBlockingStub(managedChannel);
//PrivateTransferProtocolGrpc.PrivateTransferProtocolBlockingStub stub = PrivateTransferProtocolGrpc.newBlockingStub(managedChannel);
Osx.Outbound outbound = blockingStub.invoke(inboundBuilder.build());

//TopicApplyResponse topicApplyResponse = JsonUtil.json2Object(outbound.getPayload().toByteArray(),TopicApplyResponse.class);

Osx.Outbound outbound =TransferUtil.redirect(fateContext,inboundBuilder.build(),routerInfo,false);
System.err.println(outbound);

}
Expand All @@ -233,7 +217,7 @@ public void test07Ack(long index) {
inboundBuilder.putMetadata(Osx.Metadata.SourceComponentName.name(), "");
inboundBuilder.putMetadata(Osx.Metadata.MessageTopic.name(), transferId);
inboundBuilder.putMetadata(Osx.Metadata.MessageOffSet.name(), Long.toString(index));
Osx.Outbound outbound = blockingStub.invoke(inboundBuilder.build());
Osx.Outbound outbound =TransferUtil.redirect(fateContext,inboundBuilder.build(),routerInfo,false);
System.err.println("ack response:" + outbound);
}

Expand All @@ -259,7 +243,13 @@ public void test06UnaryConsume() {
inboundBuilder.putMetadata(Osx.Metadata.MessageTopic.name(), transferId);
inboundBuilder.putMetadata(Osx.Metadata.MessageOffSet.name(), "-1");
inboundBuilder.putMetadata(Osx.Metadata.Timeout.name(),"100000");
consumeResponse = blockingStub.invoke(inboundBuilder.build());


// System.err.println(routerInfo);
consumeResponse =TransferUtil.redirect(fateContext,inboundBuilder.build(),routerInfo,false);


//consumeResponse = blockingStub.invoke(inboundBuilder.build());
System.err.println("response : "+consumeResponse);

String indexString = consumeResponse.getMetadataMap().get(Osx.Metadata.MessageOffSet.name());
Expand Down

0 comments on commit 784a200

Please sign in to comment.