diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcSpanReceiverConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcSpanReceiverConfiguration.java index 15c5ee28b2ff..c0afa08e8a48 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcSpanReceiverConfiguration.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcSpanReceiverConfiguration.java @@ -63,7 +63,7 @@ public GrpcSpanReceiverConfiguration() { @Deprecated @Configuration - @ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy") + @ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "legacy", matchIfMissing = true) public static class LegacySpanInterceptorConfiguration { @Bean public FactoryBean grpcSpanStreamScheduler(@Qualifier("grpcSpanStreamProperties") @@ -89,7 +89,7 @@ public FactoryBean spanStreamExecutorInterceptor(@Qualifier(" } @Configuration - @ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit", matchIfMissing = true) + @ConditionalOnProperty(name = "collector.receiver.grpc.span.stream.flow-control.type", havingValue = "rate-limit") public static class RateLimitServerInterceptorConfiguration { @Bean public Bandwidth spanBandwidth(@Value("${collector.receiver.grpc.span.stream.flow-control.rate-limit.capacity:5000}") long capacity, diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStreamProperties.java b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStreamProperties.java index 762a22e42b39..7f9a1c8251ff 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStreamProperties.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcStreamProperties.java @@ -26,7 +26,7 @@ public class GrpcStreamProperties { @PositiveOrZero private int callInitRequestCount = 1000; @Positive - private int schedulerPeriodMillis = 64; + private int schedulerPeriodMillis = 1000; private int schedulerRecoveryMessageCount = 10; private long idleTimeout = -1; diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java index 75304e6b1be6..38191866d146 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/DefaultServerCallWrapper.java @@ -51,6 +51,11 @@ public void cancel(Status status, Metadata trailers) { this.serverCall.close(status, trailers); } + @Override + public boolean isCancelled() { + return this.serverCall.isCancelled(); + } + @Override public String toString() { return "DefaultServerCallWrapper{" + diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java index 5baf7a431f14..3035f38217c3 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/FlowControlRejectExecutionListener.java @@ -2,8 +2,8 @@ import io.grpc.Metadata; import io.grpc.Status; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Objects; import java.util.concurrent.Future; @@ -24,6 +24,7 @@ public class FlowControlRejectExecutionListener implements RejectedExecutionList private volatile Future future; + int count ; public FlowControlRejectExecutionListener(String name, ServerCallWrapper serverCall, long recoveryMessagesCount, IdleTimeout idleTimeout) { this.name = Objects.requireNonNull(name, "name"); this.serverCall = Objects.requireNonNull(serverCall, "serverCall"); @@ -38,6 +39,17 @@ public void onRejectedExecution() { @Override public void onSchedule() { + if (logger.isTraceEnabled()) { + logger.trace("Stream state check {} agent:{}/{}", this.name, serverCall.getApplicationName(), serverCall.getAgentId()); + } + if (this.serverCall.isCancelled()) { + logger.info("Stream already cancelled:{} agent:{}/{}", this.name, serverCall.getApplicationName(), serverCall.getAgentId()); + final Future copy = future; + if (copy != null) { + copy.cancel(false); + } + return; + } if (!expireIdleTimeout()) { reject(); } @@ -59,6 +71,9 @@ private void reject() { if (currentRejectCount > 0) { final long recovery = Math.min(currentRejectCount, recoveryMessagesCount); this.rejectedExecutionCounter.addAndGet(-recovery); + if (logger.isDebugEnabled()) { + logger.debug("flow-control request:{} {}/{}", recovery, serverCall.getApplicationName(), serverCall.getAgentId()); + } serverCall.request((int) recovery); } } @@ -108,10 +123,9 @@ private void idleTimeout() { @Override public String toString() { - final StringBuilder sb = new StringBuilder("RejectedExecutionListener{"); - sb.append("rejectedExecutionCounter=").append(rejectedExecutionCounter); - sb.append(", serverCall=").append(serverCall); - sb.append('}'); - return sb.toString(); + return "RejectedExecutionListener{" + + "rejectedExecutionCounter=" + rejectedExecutionCounter + + ", serverCall=" + serverCall + + '}'; } } \ No newline at end of file diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/ServerCallWrapper.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/ServerCallWrapper.java index b92dcdd7a4b7..812570f276cf 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/ServerCallWrapper.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/ServerCallWrapper.java @@ -15,4 +15,6 @@ public interface ServerCallWrapper { SocketAddress getRemoteAddr(); void cancel(Status status, Metadata trailers); + + boolean isCancelled(); } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/SimpleRejectedExecutionListener.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/SimpleRejectedExecutionListener.java index 4789c7115949..973512bfb81c 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/SimpleRejectedExecutionListener.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/SimpleRejectedExecutionListener.java @@ -2,8 +2,8 @@ import io.grpc.Metadata; import io.grpc.Status; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Objects; import java.util.concurrent.Future; diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java index 4e2975ecf532..d8d71ee0d4dc 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorRejectedExecutionRequestScheduler.java @@ -66,11 +66,10 @@ public RejectedExecutionListener getRejectedExecutionListener() { @Override public String toString() { - final StringBuilder sb = new StringBuilder("StreamExecutorRejectedExecutionRequestScheduler{"); - sb.append("scheduledExecutorService=").append(scheduledExecutor); - sb.append(", rejectedExecutionListenerFactory=").append(rejectedExecutionListenerFactory); - sb.append('}'); - return sb.toString(); + return "StreamExecutorRejectedExecutionRequestScheduler{" + + "scheduledExecutorService=" + scheduledExecutor + + ", rejectedExecutionListenerFactory=" + rejectedExecutionListenerFactory + + '}'; } public static class Listener { @@ -102,10 +101,9 @@ public void onMessage() { @Override public String toString() { - final StringBuilder sb = new StringBuilder("Listener{"); - sb.append("rejectedExecutionListener=").append(rejectedExecutionListener); - sb.append('}'); - return sb.toString(); + return "Listener{" + + "rejectedExecutionListener=" + rejectedExecutionListener + + '}'; } } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java index b7db2a6a897f..90163157f325 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/flowcontrol/StreamExecutorServerInterceptor.java @@ -30,7 +30,6 @@ import java.util.Objects; import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; /** * @author jaehong.kim @@ -86,10 +85,11 @@ public void run() { } }); // scheduleListener.onMessage(); - } catch (RejectedExecutionException ree) { + } catch (Throwable th) { // Defense code, need log ? scheduleListener.onRejectedExecution(); - throttledLogger.info("Failed to request. Rejected execution, count={}", scheduleListener.getRejectedExecutionCount()); + throttledLogger.info("Failed to request. Rejected execution, count={} {}/{}", + scheduleListener.getRejectedExecutionCount(), serverCall.getApplicationName(), serverCall.getAgentId()); } }