From 7841cae62a7485cd6d4c831a10ef8f59022aaba2 Mon Sep 17 00:00:00 2001
From: HzjNeverStop <441627022@qq.com>
Date: Tue, 10 Oct 2023 17:57:16 +0800
Subject: [PATCH] Support auto transform SOFATracer Span in SOFA ThreadPool
(#190)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: “HzjNeverStop” <“441627022@qq.com”>
---
pom.xml | 9 +-
.../SofaScheduledThreadPoolExecutor.java | 48 +++++++++-
.../common/thread/SofaThreadPoolExecutor.java | 18 +++-
.../thread/SofaThreadPoolTaskExecutor.java | 11 +++
.../thread/SofaThreadPoolTaskScheduler.java | 7 ++
.../thread/SofaTracerCommandFactory.java | 89 +++++++++++++++++++
.../SofaScheduledThreadPoolExecutorTest.java | 79 ++++++++++++++++
.../thread/SofaThreadPoolExecutorTest.java | 46 ++++++++++
.../thread/SofaTracerCommandFactoryTest.java | 61 +++++++++++++
.../common/thread/ThreadPoolTestBase.java | 31 +++++++
...aThreadPoolTaskExecutorConstructsTest.java | 9 ++
...ThreadPoolTaskSchedulerConstructsTest.java | 14 +++
.../thread/sofaThreadPoolTaskExecutor.xml | 1 +
.../thread/sofaThreadPoolTaskScheduler.xml | 1 +
14 files changed, 416 insertions(+), 8 deletions(-)
create mode 100644 src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java
create mode 100644 src/test/java/com/alipay/sofa/common/thread/SofaTracerCommandFactoryTest.java
diff --git a/pom.xml b/pom.xml
index 624ac44f..5addb855 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.alipay.sofa.common
sofa-common-tools
- 2.0.1
+ 2.0.2
jar
${project.groupId}:${project.artifactId}
@@ -30,6 +30,7 @@
3.0.2
4.13.1
27.0-jre
+ 4.0.0
@@ -50,6 +51,12 @@
guava
${guava.version}
+
+ com.alipay.sofa
+ tracer-core
+ ${sofa.tracer.version}
+ true
+
org.slf4j
slf4j-api
diff --git a/src/main/java/com/alipay/sofa/common/thread/SofaScheduledThreadPoolExecutor.java b/src/main/java/com/alipay/sofa/common/thread/SofaScheduledThreadPoolExecutor.java
index dcfb9719..71fcdbbb 100644
--- a/src/main/java/com/alipay/sofa/common/thread/SofaScheduledThreadPoolExecutor.java
+++ b/src/main/java/com/alipay/sofa/common/thread/SofaScheduledThreadPoolExecutor.java
@@ -20,7 +20,9 @@
import com.alipay.sofa.common.thread.space.SpaceNamedThreadFactory;
import com.alipay.sofa.common.utils.StringUtil;
+import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -32,11 +34,12 @@
* @version SofaScheduledThreadPoolExecutor.java, v 0.1 2020年11月09日 2:19 下午 huzijie Exp $
*/
public class SofaScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
- private static final String SIMPLE_CLASS_NAME = SofaScheduledThreadPoolExecutor.class
- .getSimpleName();
- private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
+ private static final String SIMPLE_CLASS_NAME = SofaScheduledThreadPoolExecutor.class
+ .getSimpleName();
+ private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private final ThreadPoolConfig config;
private final ThreadPoolStatistics statistics;
+ private boolean sofaTracerTransmit = false;
/**
* Basic constructor
@@ -193,4 +196,43 @@ public ThreadPoolConfig getConfig() {
public ThreadPoolStatistics getStatistics() {
return statistics;
}
+
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ if (sofaTracerTransmit) {
+ command = SofaTracerCommandFactory.ofRunnable(command);
+ }
+ return super.schedule(command, delay, unit);
+ }
+
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
+ if (sofaTracerTransmit) {
+ callable = SofaTracerCommandFactory.ofCallable(callable);
+ }
+ return super.schedule(callable, delay, unit);
+ }
+
+ public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
+ TimeUnit unit) {
+ if (sofaTracerTransmit) {
+ command = SofaTracerCommandFactory.ofRunnable(command);
+ }
+ return super.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay,
+ long delay, TimeUnit unit) {
+ if (sofaTracerTransmit) {
+ command = SofaTracerCommandFactory.ofRunnable(command);
+ }
+ return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
+ this.sofaTracerTransmit = sofaTracerTransmit;
+ }
+
+ public boolean isSofaTracerTransmit() {
+ return sofaTracerTransmit;
+ }
+
}
diff --git a/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolExecutor.java b/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolExecutor.java
index 70eb4cdd..17de9354 100644
--- a/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolExecutor.java
+++ b/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolExecutor.java
@@ -29,11 +29,12 @@
* Created on 2020/3/16
*/
public class SofaThreadPoolExecutor extends ThreadPoolExecutor {
- private static final String SIMPLE_CLASS_NAME = SofaThreadPoolExecutor.class
- .getSimpleName();
- private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
+ private static final String SIMPLE_CLASS_NAME = SofaThreadPoolExecutor.class
+ .getSimpleName();
+ private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private final ThreadPoolConfig config;
private final ThreadPoolStatistics statistics;
+ private boolean sofaTracerTransmit = false;
/**
* Basic constructor
@@ -142,7 +143,8 @@ public SofaThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAl
@Override
public void execute(Runnable command) {
- ExecutingRunnable runner = new ExecutingRunnable(command);
+ ExecutingRunnable runner = sofaTracerTransmit ? SofaTracerCommandFactory
+ .ofExecutingRunnable(command) : new ExecutingRunnable(command);
runner.setEnqueueTime(System.currentTimeMillis());
super.execute(runner);
}
@@ -220,4 +222,12 @@ public ThreadPoolStatistics getStatistics() {
private String createName() {
return SIMPLE_CLASS_NAME + String.format("%08x", POOL_COUNTER.getAndIncrement());
}
+
+ public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
+ this.sofaTracerTransmit = sofaTracerTransmit;
+ }
+
+ public boolean isSofaTracerTransmit() {
+ return sofaTracerTransmit;
+ }
}
diff --git a/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskExecutor.java b/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskExecutor.java
index 5e12a8c2..baf28938 100644
--- a/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskExecutor.java
+++ b/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskExecutor.java
@@ -42,6 +42,8 @@ public class SofaThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
protected long period;
+ protected boolean sofaTracerTransmit;
+
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
@@ -73,6 +75,7 @@ public void execute(Runnable command) {
rejectedExecutionHandler, threadPoolName, spaceName, taskTimeout, period,
TimeUnit.MILLISECONDS);
}
+ executor.setSofaTracerTransmit(sofaTracerTransmit);
Boolean allowCoreThreadTimeOut = ClassUtil.getField("allowCoreThreadTimeOut", this);
if (allowCoreThreadTimeOut) {
@@ -144,4 +147,12 @@ public TimeUnit getTimeUnit() {
}
return sofaThreadPoolExecutor.getConfig().getTimeUnit();
}
+
+ public boolean isSofaTracerTransmit() {
+ return sofaTracerTransmit;
+ }
+
+ public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
+ this.sofaTracerTransmit = sofaTracerTransmit;
+ }
}
diff --git a/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskScheduler.java b/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskScheduler.java
index d927c09c..668ec4d5 100644
--- a/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskScheduler.java
+++ b/src/main/java/com/alipay/sofa/common/thread/SofaThreadPoolTaskScheduler.java
@@ -43,6 +43,8 @@ public class SofaThreadPoolTaskScheduler extends ThreadPoolTaskScheduler {
protected long period;
+ protected boolean sofaTracerTransmit;
+
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
@@ -54,6 +56,7 @@ protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
SofaScheduledThreadPoolExecutor executor = new SofaScheduledThreadPoolExecutor(
getPoolSize(), threadFactory, rejectedExecutionHandler, threadPoolName, spaceName,
taskTimeout, period, TimeUnit.MILLISECONDS);
+ executor.setSofaTracerTransmit(sofaTracerTransmit);
Boolean removeOnCancelPolicy = ClassUtil.getField("removeOnCancelPolicy", this);
if (removeOnCancelPolicy) {
@@ -126,4 +129,8 @@ public TimeUnit getTimeUnit() {
}
return sofaScheduledThreadPoolExecutor.getConfig().getTimeUnit();
}
+
+ public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
+ this.sofaTracerTransmit = sofaTracerTransmit;
+ }
}
diff --git a/src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java b/src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java
new file mode 100644
index 00000000..ebffaf4e
--- /dev/null
+++ b/src/main/java/com/alipay/sofa/common/thread/SofaTracerCommandFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.common.thread;
+
+import com.alipay.common.tracer.core.async.SofaTracerCallable;
+import com.alipay.common.tracer.core.async.SofaTracerRunnable;
+import com.alipay.sofa.common.utils.ClassUtil;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Factory to create SOFA-Tracer work command.
+ * @author huzijie
+ * @version SofaTracerCommandFactory.java, v 0.1 2023年09月26日 2:53 PM huzijie Exp $
+ */
+public class SofaTracerCommandFactory {
+
+ private static final String SOFA_TRACER_RUNNABLE_CLASS_NAME = "com.alipay.common.tracer.core.async.SofaTracerRunnable";
+ private static final boolean SOFA_TRACER_CLASS_PRESENT = ClassUtil
+ .isPresent(
+ SOFA_TRACER_RUNNABLE_CLASS_NAME,
+ SofaTracerCommandFactory.class
+ .getClassLoader());
+
+ static ExecutingRunnable ofExecutingRunnable(Runnable runnable) {
+ if (!SOFA_TRACER_CLASS_PRESENT) {
+ return new ExecutingRunnable(runnable);
+ }
+ return new SofaTracerCommandFactory.SofaTracerExecutingRunnable(runnable);
+ }
+
+ static Runnable ofRunnable(Runnable runnable) {
+ if (!SOFA_TRACER_CLASS_PRESENT) {
+ return runnable;
+ }
+ if (runnable instanceof SofaTracerRunnable) {
+ return runnable;
+ }
+ return new SofaTracerRunnable(runnable);
+ }
+
+ static Callable ofCallable(Callable callable) {
+ if (!SOFA_TRACER_CLASS_PRESENT) {
+ return callable;
+ }
+ if (callable instanceof SofaTracerCallable) {
+ return callable;
+ }
+ return new SofaTracerCallable<>(callable);
+ }
+
+ /**
+ * The wrapper to the {@link ExecutingRunnable} to transmit SofaTracerSpan.
+ * @author huzijie
+ * @version SofaTracerExecutingRunnable.java, v 0.1 2023年09月26日 11:45 AM huzijie Exp $
+ */
+ public static class SofaTracerExecutingRunnable extends ExecutingRunnable {
+
+ private final SofaTracerRunnable sofaTracerRunnable;
+
+ public SofaTracerExecutingRunnable(Runnable originRunnable) {
+ super(originRunnable);
+ if (originRunnable instanceof SofaTracerRunnable) {
+ this.sofaTracerRunnable = (SofaTracerRunnable) originRunnable;
+ } else {
+ this.sofaTracerRunnable = new SofaTracerRunnable(originRunnable);
+ }
+ }
+
+ @Override
+ public void run() {
+ sofaTracerRunnable.run();
+ }
+ }
+}
diff --git a/src/test/java/com/alipay/sofa/common/thread/SofaScheduledThreadPoolExecutorTest.java b/src/test/java/com/alipay/sofa/common/thread/SofaScheduledThreadPoolExecutorTest.java
index 5732d614..d8623509 100644
--- a/src/test/java/com/alipay/sofa/common/thread/SofaScheduledThreadPoolExecutorTest.java
+++ b/src/test/java/com/alipay/sofa/common/thread/SofaScheduledThreadPoolExecutorTest.java
@@ -21,8 +21,10 @@
import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author huzijie
@@ -135,4 +137,81 @@ public void testLoggingBurst() throws Exception {
Assert.assertEquals(numThreads, aberrantListAppender.list.size());
Assert.assertTrue(isLastInfoMatch("Thread pool with name '\\S+' unregistered"));
}
+
+ @Test
+ public void testNoTracerTransmit() throws InterruptedException {
+ AtomicInteger success = new AtomicInteger(0);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ threadPool.schedule(() -> {
+ try {
+ assertTraceSpanNotExist();
+ success.incrementAndGet();
+ } finally {
+ countDownLatch.countDown();
+ }
+ }, 10, TimeUnit.MILLISECONDS);
+ countDownLatch.await();
+ Assert.assertEquals(success.get(), 1);
+ }
+
+ @Test
+ public void testEnableTracerTransmit() throws InterruptedException {
+ threadPool.setSofaTracerTransmit(true);
+
+ AtomicInteger fail = new AtomicInteger(0);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ threadPool.schedule(() -> {
+ try {
+ assertTraceSpanExist();
+ } catch (Throwable t) {
+ fail.incrementAndGet();
+ } finally {
+ countDownLatch.countDown();
+ }
+ }, 10, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(fail.get(), 0);
+
+ fail.set(0);
+ threadPool.schedule(() -> {
+ try {
+ return assertTraceSpanExist();
+ } catch (Throwable t) {
+ fail.incrementAndGet();
+ return null;
+ } finally {
+ countDownLatch.countDown();
+ }
+ }, 10, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(fail.get(), 0);
+
+ fail.set(0);
+ CountDownLatch fixRateCountDownLatch = new CountDownLatch(2);
+ threadPool.scheduleAtFixedRate(() -> {
+ try {
+ assertTraceSpanExist();
+ } catch (Throwable t) {
+ fail.incrementAndGet();
+ } finally {
+ fixRateCountDownLatch.countDown();
+ }
+ }, 10, 10, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(fixRateCountDownLatch.await(30, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(fail.get(), 0);
+
+ fail.set(0);
+ CountDownLatch fixDelayCountDownLatch = new CountDownLatch(2);
+ threadPool.scheduleWithFixedDelay(() -> {
+ try {
+ assertTraceSpanExist();
+ } catch (Throwable t) {
+ fail.incrementAndGet();
+ } finally {
+ fixDelayCountDownLatch.countDown();
+ }
+ }, 10, 10, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(fixDelayCountDownLatch.await(30, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(fail.get(), 0);
+ }
}
diff --git a/src/test/java/com/alipay/sofa/common/thread/SofaThreadPoolExecutorTest.java b/src/test/java/com/alipay/sofa/common/thread/SofaThreadPoolExecutorTest.java
index 4407b8b8..9ad76cf9 100644
--- a/src/test/java/com/alipay/sofa/common/thread/SofaThreadPoolExecutorTest.java
+++ b/src/test/java/com/alipay/sofa/common/thread/SofaThreadPoolExecutorTest.java
@@ -22,7 +22,9 @@
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Alaneuler
@@ -173,4 +175,48 @@ public void testLoggingBurst() throws Exception {
Assert.assertEquals(numThreads, aberrantListAppender.list.size());
Assert.assertTrue(isLastInfoMatch("Thread pool with name '\\S+' unregistered"));
}
+
+ @Test
+ public void testNoTracerTransmit() throws InterruptedException {
+ AtomicInteger success = new AtomicInteger(0);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ threadPool.execute(() -> {
+ try {
+ assertTraceSpanNotExist();
+ success.incrementAndGet();
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(success.get(), 1);
+ }
+
+ @Test
+ public void testEnableTracerTransmit() throws InterruptedException {
+ threadPool.setSofaTracerTransmit(true);
+
+ AtomicInteger success = new AtomicInteger(0);
+ CountDownLatch countDownLatch = new CountDownLatch(2);
+ threadPool.execute(() -> {
+ try {
+ assertTraceSpanExist();
+ success.incrementAndGet();
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+
+ threadPool.submit(() -> {
+ try {
+ String id = assertTraceSpanExist();
+ success.incrementAndGet();
+ return id;
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(success.get(), 2);
+ }
}
diff --git a/src/test/java/com/alipay/sofa/common/thread/SofaTracerCommandFactoryTest.java b/src/test/java/com/alipay/sofa/common/thread/SofaTracerCommandFactoryTest.java
new file mode 100644
index 00000000..c2be63e7
--- /dev/null
+++ b/src/test/java/com/alipay/sofa/common/thread/SofaTracerCommandFactoryTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.common.thread;
+
+import com.alipay.common.tracer.core.async.SofaTracerCallable;
+import com.alipay.common.tracer.core.async.SofaTracerRunnable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+
+/**
+ * @author huzijie
+ * @version SofaTracerCommandFactoryTest.java, v 0.1 2023年09月26日 3:13 PM huzijie Exp $
+ */
+public class SofaTracerCommandFactoryTest {
+
+ @Test
+ public void ofExecutingRunnable() {
+ Runnable runnable = () -> {};
+ Assert.assertTrue(SofaTracerCommandFactory.ofExecutingRunnable(runnable) instanceof SofaTracerCommandFactory.SofaTracerExecutingRunnable);
+ }
+
+ @Test
+ public void ofRunnable() {
+ Runnable runnable = () -> {};
+ Runnable newRunnable = SofaTracerCommandFactory.ofRunnable(runnable);
+ Assert.assertTrue(newRunnable instanceof SofaTracerRunnable);
+ Assert.assertNotEquals(runnable, newRunnable);
+
+ Runnable duplicateWrapRunnable = SofaTracerCommandFactory.ofRunnable(newRunnable);
+ Assert.assertTrue(duplicateWrapRunnable instanceof SofaTracerRunnable);
+ Assert.assertEquals(duplicateWrapRunnable, newRunnable);
+ }
+
+ @Test
+ public void ofCallable() {
+ Callable callable = () -> null;
+ Callable newCallable = SofaTracerCommandFactory.ofCallable(callable);
+ Assert.assertTrue(newCallable instanceof SofaTracerCallable);
+ Assert.assertNotEquals(callable, newCallable);
+
+ Callable duplicateWrapCallable = SofaTracerCommandFactory.ofCallable(newCallable);
+ Assert.assertTrue(duplicateWrapCallable instanceof SofaTracerCallable);
+ Assert.assertEquals(duplicateWrapCallable, duplicateWrapCallable);
+ }
+}
diff --git a/src/test/java/com/alipay/sofa/common/thread/ThreadPoolTestBase.java b/src/test/java/com/alipay/sofa/common/thread/ThreadPoolTestBase.java
index 6b539c51..d3f323c9 100644
--- a/src/test/java/com/alipay/sofa/common/thread/ThreadPoolTestBase.java
+++ b/src/test/java/com/alipay/sofa/common/thread/ThreadPoolTestBase.java
@@ -19,9 +19,15 @@
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+import com.alipay.common.tracer.core.SofaTracer;
+import com.alipay.common.tracer.core.context.trace.SofaTraceContext;
+import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
+import com.alipay.common.tracer.core.mock.MockSofaTracer;
+import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.sofa.common.log.Constants;
import com.alipay.sofa.common.thread.log.ThreadLogger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import java.lang.reflect.Field;
@@ -53,6 +59,8 @@ public void beforeTest() {
aberrantListAppender.start();
((Logger) ThreadLogger.INFO_THREAD_LOGGER).addAppender(infoListAppender);
((Logger) ThreadLogger.WARN_THREAD_LOGGER).addAppender(aberrantListAppender);
+
+ initTracerContext();
}
@After
@@ -152,4 +160,27 @@ public String call() throws Exception {
return "sleepCallableTask";
}
}
+
+ protected void initTracerContext() {
+ // clear
+ SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
+ sofaTraceContext.clear();
+
+ // init tracer
+ SofaTracer sofaTracer = MockSofaTracer.getMockSofaTracer();
+ SofaTracerSpan span = (SofaTracerSpan) sofaTracer.buildSpan("test").start();
+ sofaTraceContext.push(span);
+ }
+
+ protected String assertTraceSpanExist() {
+ SofaTracerSpan tracerSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan();
+ String traceId = tracerSpan.getSofaTracerSpanContext().getTraceId();
+ Assert.assertNotNull(traceId);
+ return traceId;
+ }
+
+ protected void assertTraceSpanNotExist() {
+ SofaTracerSpan tracerSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan();
+ Assert.assertNull(tracerSpan);
+ }
}
diff --git a/src/test/java/com/alipay/sofa/common/thread/construct/SofaThreadPoolTaskExecutorConstructsTest.java b/src/test/java/com/alipay/sofa/common/thread/construct/SofaThreadPoolTaskExecutorConstructsTest.java
index a875b5f9..fb5820ee 100644
--- a/src/test/java/com/alipay/sofa/common/thread/construct/SofaThreadPoolTaskExecutorConstructsTest.java
+++ b/src/test/java/com/alipay/sofa/common/thread/construct/SofaThreadPoolTaskExecutorConstructsTest.java
@@ -16,6 +16,7 @@
*/
package com.alipay.sofa.common.thread.construct;
+import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import com.alipay.sofa.common.thread.SofaThreadPoolTaskExecutor;
import com.alipay.sofa.common.thread.bean.TestTaskDecorator;
import org.junit.Assert;
@@ -31,6 +32,7 @@
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
/**
* @author huzijie
@@ -60,6 +62,9 @@ public void testSofaThreadPoolTaskExecutorA() throws ExecutionException, Interru
Assert.assertEquals(2000, ((SofaThreadPoolTaskExecutor) sofaThreadPoolTaskExecutorA).getTaskTimeout());
Assert.assertEquals(10000, ((SofaThreadPoolTaskExecutor) sofaThreadPoolTaskExecutorA).getPeriod());
Assert.assertTrue(sofaThreadPoolTaskExecutorA.getThreadPoolExecutor().allowsCoreThreadTimeOut());
+ ThreadPoolExecutor sofaThreadPoolExecutor = sofaThreadPoolTaskExecutorA.getThreadPoolExecutor();
+ Assert.assertTrue(sofaThreadPoolExecutor instanceof SofaThreadPoolExecutor);
+ Assert.assertTrue(((SofaThreadPoolExecutor) sofaThreadPoolExecutor).isSofaTracerTransmit());
TestTaskDecorator.clearCount();
sofaThreadPoolTaskExecutorA.submit(() -> { }).get();
Assert.assertEquals(1, TestTaskDecorator.count.get());
@@ -77,6 +82,9 @@ public void testSofaThreadPoolTaskExecutorB() throws ExecutionException, Interru
Assert.assertEquals("testSpaceB", ((SofaThreadPoolTaskExecutor) sofaThreadPoolTaskExecutorB).getSpaceName());
Assert.assertEquals(3000, ((SofaThreadPoolTaskExecutor) sofaThreadPoolTaskExecutorB).getTaskTimeout());
Assert.assertEquals(5000, ((SofaThreadPoolTaskExecutor) sofaThreadPoolTaskExecutorB).getPeriod());
+ ThreadPoolExecutor sofaThreadPoolExecutor = sofaThreadPoolTaskExecutorB.getThreadPoolExecutor();
+ Assert.assertTrue(sofaThreadPoolExecutor instanceof SofaThreadPoolExecutor);
+ Assert.assertTrue(((SofaThreadPoolExecutor) sofaThreadPoolExecutor).isSofaTracerTransmit());
TestTaskDecorator.clearCount();
sofaThreadPoolTaskExecutorB.submit(() -> { }).get();
Assert.assertEquals(1, TestTaskDecorator.count.get());
@@ -99,6 +107,7 @@ public ThreadPoolTaskExecutor testSofaThreadPoolTaskExecutorB(TaskDecorator task
sofaThreadPoolTaskExecutor.setPeriod(5000);
sofaThreadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
sofaThreadPoolTaskExecutor.setTaskDecorator(taskDecorator);
+ sofaThreadPoolTaskExecutor.setSofaTracerTransmit(true);
return sofaThreadPoolTaskExecutor;
}
}
diff --git a/src/test/java/com/alipay/sofa/common/thread/construct/SofaThreadPoolTaskSchedulerConstructsTest.java b/src/test/java/com/alipay/sofa/common/thread/construct/SofaThreadPoolTaskSchedulerConstructsTest.java
index 6b3e60a5..912a9ed0 100644
--- a/src/test/java/com/alipay/sofa/common/thread/construct/SofaThreadPoolTaskSchedulerConstructsTest.java
+++ b/src/test/java/com/alipay/sofa/common/thread/construct/SofaThreadPoolTaskSchedulerConstructsTest.java
@@ -16,6 +16,7 @@
*/
package com.alipay.sofa.common.thread.construct;
+import com.alipay.sofa.common.thread.SofaScheduledThreadPoolExecutor;
import com.alipay.sofa.common.thread.SofaThreadPoolTaskScheduler;
import org.junit.Assert;
import org.junit.Test;
@@ -28,6 +29,8 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import java.util.concurrent.ScheduledExecutorService;
+
/**
* @author huzijie
* @version SofaThreadTaskSchedulerConstructsTest.java, v 0.1 2020年11月09日 5:35 下午 huzijie Exp $
@@ -61,6 +64,11 @@ public void testSofaThreadPoolTaskSchedulerA() {
((SofaThreadPoolTaskScheduler) sofaThreadPoolTaskSchedulerA).getPeriod());
Assert.assertTrue(sofaThreadPoolTaskSchedulerA.getScheduledThreadPoolExecutor()
.getRemoveOnCancelPolicy());
+ ScheduledExecutorService scheduledExecutorService = sofaThreadPoolTaskSchedulerA
+ .getScheduledExecutor();
+ Assert.assertTrue(scheduledExecutorService instanceof SofaScheduledThreadPoolExecutor);
+ Assert.assertTrue(((SofaScheduledThreadPoolExecutor) scheduledExecutorService)
+ .isSofaTracerTransmit());
sofaThreadPoolTaskSchedulerA.shutdown();
}
@@ -81,6 +89,11 @@ public void testSofaThreadPoolTaskSchedulerB() {
((SofaThreadPoolTaskScheduler) sofaThreadPoolTaskSchedulerB).getPeriod());
Assert.assertTrue(sofaThreadPoolTaskSchedulerB.getScheduledThreadPoolExecutor()
.getRemoveOnCancelPolicy());
+ ScheduledExecutorService scheduledExecutorService = sofaThreadPoolTaskSchedulerB
+ .getScheduledExecutor();
+ Assert.assertTrue(scheduledExecutorService instanceof SofaScheduledThreadPoolExecutor);
+ Assert.assertTrue(((SofaScheduledThreadPoolExecutor) scheduledExecutorService)
+ .isSofaTracerTransmit());
sofaThreadPoolTaskSchedulerB.shutdown();
}
@@ -97,6 +110,7 @@ public SofaThreadPoolTaskScheduler testSofaThreadPoolTaskSchedulerB() {
sofaThreadPoolTaskScheduler.setTaskTimeout(3000);
sofaThreadPoolTaskScheduler.setPeriod(5000);
sofaThreadPoolTaskScheduler.setRemoveOnCancelPolicy(true);
+ sofaThreadPoolTaskScheduler.setSofaTracerTransmit(true);
return sofaThreadPoolTaskScheduler;
}
}
diff --git a/src/test/resources/thread/sofaThreadPoolTaskExecutor.xml b/src/test/resources/thread/sofaThreadPoolTaskExecutor.xml
index 4967447e..ce741bf3 100644
--- a/src/test/resources/thread/sofaThreadPoolTaskExecutor.xml
+++ b/src/test/resources/thread/sofaThreadPoolTaskExecutor.xml
@@ -19,6 +19,7 @@
+
diff --git a/src/test/resources/thread/sofaThreadPoolTaskScheduler.xml b/src/test/resources/thread/sofaThreadPoolTaskScheduler.xml
index e814f275..0c923a5b 100644
--- a/src/test/resources/thread/sofaThreadPoolTaskScheduler.xml
+++ b/src/test/resources/thread/sofaThreadPoolTaskScheduler.xml
@@ -15,6 +15,7 @@
+