Skip to content

Commit

Permalink
Support auto transform SOFATracer Span in SOFA ThreadPool (#190)
Browse files Browse the repository at this point in the history
Co-authored-by: “HzjNeverStop” <“441627022@qq.com”>
  • Loading branch information
HzjNeverStop and “HzjNeverStop” authored Oct 10, 2023
1 parent e0ff800 commit 7841cae
Show file tree
Hide file tree
Showing 14 changed files with 416 additions and 8 deletions.
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.alipay.sofa.common</groupId>
<artifactId>sofa-common-tools</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand All @@ -30,6 +30,7 @@
<spring.boot.version>3.0.2</spring.boot.version>
<junit.version>4.13.1</junit.version>
<guava.version>27.0-jre</guava.version>
<sofa.tracer.version>4.0.0</sofa.tracer.version>
</properties>

<dependencyManagement>
Expand All @@ -50,6 +51,12 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>tracer-core</artifactId>
<version>${sofa.tracer.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.alipay.sofa.common.thread.space.SpaceNamedThreadFactory;
import com.alipay.sofa.common.utils.StringUtil;

import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,11 +34,12 @@
* @version SofaScheduledThreadPoolExecutor.java, v 0.1 2020年11月09日 2:19 下午 huzijie Exp $
*/
public class SofaScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final String SIMPLE_CLASS_NAME = SofaScheduledThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private static final String SIMPLE_CLASS_NAME = SofaScheduledThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private final ThreadPoolConfig config;
private final ThreadPoolStatistics statistics;
private boolean sofaTracerTransmit = false;

/**
* Basic constructor
Expand Down Expand Up @@ -193,4 +196,43 @@ public ThreadPoolConfig getConfig() {
public ThreadPoolStatistics getStatistics() {
return statistics;
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.schedule(command, delay, unit);
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
callable = SofaTracerCommandFactory.ofCallable(callable);
}
return super.schedule(callable, delay, unit);
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.scheduleAtFixedRate(command, initialDelay, period, unit);
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
if (sofaTracerTransmit) {
command = SofaTracerCommandFactory.ofRunnable(command);
}
return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
* Created on 2020/3/16
*/
public class SofaThreadPoolExecutor extends ThreadPoolExecutor {
private static final String SIMPLE_CLASS_NAME = SofaThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private static final String SIMPLE_CLASS_NAME = SofaThreadPoolExecutor.class
.getSimpleName();
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(0);
private final ThreadPoolConfig config;
private final ThreadPoolStatistics statistics;
private boolean sofaTracerTransmit = false;

/**
* Basic constructor
Expand Down Expand Up @@ -142,7 +143,8 @@ public SofaThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAl

@Override
public void execute(Runnable command) {
ExecutingRunnable runner = new ExecutingRunnable(command);
ExecutingRunnable runner = sofaTracerTransmit ? SofaTracerCommandFactory
.ofExecutingRunnable(command) : new ExecutingRunnable(command);
runner.setEnqueueTime(System.currentTimeMillis());
super.execute(runner);
}
Expand Down Expand Up @@ -220,4 +222,12 @@ public ThreadPoolStatistics getStatistics() {
private String createName() {
return SIMPLE_CLASS_NAME + String.format("%08x", POOL_COUNTER.getAndIncrement());
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class SofaThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

protected long period;

protected boolean sofaTracerTransmit;

@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
Expand Down Expand Up @@ -73,6 +75,7 @@ public void execute(Runnable command) {
rejectedExecutionHandler, threadPoolName, spaceName, taskTimeout, period,
TimeUnit.MILLISECONDS);
}
executor.setSofaTracerTransmit(sofaTracerTransmit);

Boolean allowCoreThreadTimeOut = ClassUtil.getField("allowCoreThreadTimeOut", this);
if (allowCoreThreadTimeOut) {
Expand Down Expand Up @@ -144,4 +147,12 @@ public TimeUnit getTimeUnit() {
}
return sofaThreadPoolExecutor.getConfig().getTimeUnit();
}

public boolean isSofaTracerTransmit() {
return sofaTracerTransmit;
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class SofaThreadPoolTaskScheduler extends ThreadPoolTaskScheduler {

protected long period;

protected boolean sofaTracerTransmit;

@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
Expand All @@ -54,6 +56,7 @@ protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
SofaScheduledThreadPoolExecutor executor = new SofaScheduledThreadPoolExecutor(
getPoolSize(), threadFactory, rejectedExecutionHandler, threadPoolName, spaceName,
taskTimeout, period, TimeUnit.MILLISECONDS);
executor.setSofaTracerTransmit(sofaTracerTransmit);

Boolean removeOnCancelPolicy = ClassUtil.getField("removeOnCancelPolicy", this);
if (removeOnCancelPolicy) {
Expand Down Expand Up @@ -126,4 +129,8 @@ public TimeUnit getTimeUnit() {
}
return sofaScheduledThreadPoolExecutor.getConfig().getTimeUnit();
}

public void setSofaTracerTransmit(boolean sofaTracerTransmit) {
this.sofaTracerTransmit = sofaTracerTransmit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.common.thread;

import com.alipay.common.tracer.core.async.SofaTracerCallable;
import com.alipay.common.tracer.core.async.SofaTracerRunnable;
import com.alipay.sofa.common.utils.ClassUtil;

import java.util.concurrent.Callable;

/**
* Factory to create SOFA-Tracer work command.
* @author huzijie
* @version SofaTracerCommandFactory.java, v 0.1 2023年09月26日 2:53 PM huzijie Exp $
*/
public class SofaTracerCommandFactory {

private static final String SOFA_TRACER_RUNNABLE_CLASS_NAME = "com.alipay.common.tracer.core.async.SofaTracerRunnable";
private static final boolean SOFA_TRACER_CLASS_PRESENT = ClassUtil
.isPresent(
SOFA_TRACER_RUNNABLE_CLASS_NAME,
SofaTracerCommandFactory.class
.getClassLoader());

static ExecutingRunnable ofExecutingRunnable(Runnable runnable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return new ExecutingRunnable(runnable);
}
return new SofaTracerCommandFactory.SofaTracerExecutingRunnable(runnable);
}

static Runnable ofRunnable(Runnable runnable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return runnable;
}
if (runnable instanceof SofaTracerRunnable) {
return runnable;
}
return new SofaTracerRunnable(runnable);
}

static <V> Callable<V> ofCallable(Callable<V> callable) {
if (!SOFA_TRACER_CLASS_PRESENT) {
return callable;
}
if (callable instanceof SofaTracerCallable) {
return callable;
}
return new SofaTracerCallable<>(callable);
}

/**
* The wrapper to the {@link ExecutingRunnable} to transmit SofaTracerSpan.
* @author huzijie
* @version SofaTracerExecutingRunnable.java, v 0.1 2023年09月26日 11:45 AM huzijie Exp $
*/
public static class SofaTracerExecutingRunnable extends ExecutingRunnable {

private final SofaTracerRunnable sofaTracerRunnable;

public SofaTracerExecutingRunnable(Runnable originRunnable) {
super(originRunnable);
if (originRunnable instanceof SofaTracerRunnable) {
this.sofaTracerRunnable = (SofaTracerRunnable) originRunnable;
} else {
this.sofaTracerRunnable = new SofaTracerRunnable(originRunnable);
}
}

@Override
public void run() {
sofaTracerRunnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author huzijie
Expand Down Expand Up @@ -135,4 +137,81 @@ public void testLoggingBurst() throws Exception {
Assert.assertEquals(numThreads, aberrantListAppender.list.size());
Assert.assertTrue(isLastInfoMatch("Thread pool with name '\\S+' unregistered"));
}

@Test
public void testNoTracerTransmit() throws InterruptedException {
AtomicInteger success = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(1);
threadPool.schedule(() -> {
try {
assertTraceSpanNotExist();
success.incrementAndGet();
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
countDownLatch.await();
Assert.assertEquals(success.get(), 1);
}

@Test
public void testEnableTracerTransmit() throws InterruptedException {
threadPool.setSofaTracerTransmit(true);

AtomicInteger fail = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(1);
threadPool.schedule(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
threadPool.schedule(() -> {
try {
return assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
return null;
} finally {
countDownLatch.countDown();
}
}, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(countDownLatch.await(20, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
CountDownLatch fixRateCountDownLatch = new CountDownLatch(2);
threadPool.scheduleAtFixedRate(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
fixRateCountDownLatch.countDown();
}
}, 10, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(fixRateCountDownLatch.await(30, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);

fail.set(0);
CountDownLatch fixDelayCountDownLatch = new CountDownLatch(2);
threadPool.scheduleWithFixedDelay(() -> {
try {
assertTraceSpanExist();
} catch (Throwable t) {
fail.incrementAndGet();
} finally {
fixDelayCountDownLatch.countDown();
}
}, 10, 10, TimeUnit.MILLISECONDS);
Assert.assertTrue(fixDelayCountDownLatch.await(30, TimeUnit.MILLISECONDS));
Assert.assertEquals(fail.get(), 0);
}
}
Loading

0 comments on commit 7841cae

Please sign in to comment.