Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix thread leak #568

Merged
merged 11 commits into from
Mar 7, 2018
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Fixed performance issue on SDK startup.
- Fixed PageView telemetry data not being reported.
- Fixed Issue #526 (NPE in MapUtil.copy())
- Fixed Issue #513 (Memory leak in SDKShutdownActivity). This fix upgrades our Servlet version from 2.5 to 3.0. The SDK must now be run on an application server supporting Servlet 3.0.

## Version 2.0.0-BETA
- Updating various dependencies to latest version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ private static String createMessage(String prefix, String message, Object... arg
currentDateAsString = dateFormatter.format(new Date());
}
String formattedMessage = String.format(message, args);
String theMessage = String.format("%s %s, %d: %s", prefix, currentDateAsString, Thread.currentThread().getId(), formattedMessage);
final Thread thisThread = Thread.currentThread();
String theMessage = String.format("%s %s, %d(%s): %s", prefix, currentDateAsString, thisThread.getId(), thisThread.getName(), formattedMessage);
return theMessage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

import com.microsoft.applicationinsights.TelemetryConfiguration;
import com.microsoft.applicationinsights.extensibility.TelemetryInitializer;
import com.microsoft.applicationinsights.extensibility.initializer.docker.internal.*;
import com.microsoft.applicationinsights.extensibility.initializer.docker.internal.Constants;
import com.microsoft.applicationinsights.extensibility.initializer.docker.internal.DockerContext;
import com.microsoft.applicationinsights.extensibility.initializer.docker.internal.DockerContextPoller;
import com.microsoft.applicationinsights.extensibility.initializer.docker.internal.FileFactory;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;
import com.microsoft.applicationinsights.internal.util.LocalStringsUtils;
import com.microsoft.applicationinsights.telemetry.Telemetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ protected DockerContextPoller(File contextFile, DockerContextFactory dockerConte
}

public DockerContextPoller(String contextFileDirectory) {
this(new File(contextFileDirectory + "/" + CONTEXT_FILE_NAME), new DockerContextFactory());
this.setDaemon(true);
this.contextFile = new File(contextFileDirectory + "/" + CONTEXT_FILE_NAME);
this.dockerContextFactory = new DockerContextFactory();
this.setName(DockerContextPoller.class.getSimpleName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
package com.microsoft.applicationinsights.internal.channel.common;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
Expand All @@ -38,11 +38,11 @@
* Created by gupele on 12/22/2014.
*/
public final class ActiveTransmissionFileSystemOutput implements TransmissionOutput {
private static final AtomicInteger INSTANCE_ID_POOL = new AtomicInteger(1);
private final ThreadPoolExecutor threadPool;

private final TransmissionOutput actualOutput;

private final TransmissionPolicyStateFetcher transmissionPolicy;
private final int instanceId = INSTANCE_ID_POOL.getAndIncrement();

public ActiveTransmissionFileSystemOutput(TransmissionOutput actualOutput, TransmissionPolicyStateFetcher transmissionPolicy) {
Preconditions.checkNotNull(transmissionPolicy, "transmissionPolicy must be a non-null value");
Expand All @@ -52,14 +52,7 @@ public ActiveTransmissionFileSystemOutput(TransmissionOutput actualOutput, Trans
this.transmissionPolicy = transmissionPolicy;

threadPool = ThreadPoolUtils.newLimitedThreadPool(1, 3, 20L, 1024);
threadPool.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
threadPool.setThreadFactory(ThreadPoolUtils.createDaemonThreadFactory(ActiveTransmissionFileSystemOutput.class, instanceId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public ActiveTransmissionLoader(final TransmissionFileSystemOutput fileSystem,
this.fileSystem = fileSystem;
this.dispatcher = dispatcher;
threads = new Thread[numberOfThreads];
final String threadNameFmt = String.format("%s-worker-%%d", ActiveTransmissionLoader.class.getSimpleName());
for (int i = 0; i < numberOfThreads; ++i) {
threads[i] = new Thread(new Runnable() {
@Override
Expand Down Expand Up @@ -130,7 +131,7 @@ public void run() {
// TODO: check whether we need to pause after exception
}
}
});
}, String.format(threadNameFmt, i));
threads[i].setDaemon(true);
}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
package com.microsoft.applicationinsights.internal.channel.common;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
Expand All @@ -38,14 +38,13 @@ public final class ActiveTransmissionNetworkOutput implements TransmissionOutput
private final static int DEFAULT_MIN_NUMBER_OF_THREADS = 7;
private final static int DEFAULT_MAX_NUMBER_OF_THREADS = 7;
private final static long DEFAULT_REMOVE_IDLE_THREAD_TIMEOUT_IN_SECONDS = 60L;
private final static AtomicInteger INTSTANCE_ID_POOL = new AtomicInteger(1);

private final int maxThreads;

private final ThreadPoolExecutor outputThreads;

private final TransmissionOutput actualOutput;

private final TransmissionPolicyStateFetcher transmissionPolicy;
private final int instanceId = INTSTANCE_ID_POOL.getAndIncrement();

public ActiveTransmissionNetworkOutput(TransmissionOutput actualOutput, TransmissionPolicyStateFetcher transmissionPolicy) {
this(actualOutput, transmissionPolicy, DEFAULT_MAX_MESSAGES_IN_BUFFER);
Expand All @@ -63,14 +62,7 @@ public ActiveTransmissionNetworkOutput(TransmissionOutput actualOutput, Transmis
maxThreads,
DEFAULT_REMOVE_IDLE_THREAD_TIMEOUT_IN_SECONDS,
maxMessagesInBuffer);
outputThreads.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
outputThreads.setThreadFactory(ThreadPoolUtils.createDaemonThreadFactory(ActiveTransmissionNetworkOutput.class, instanceId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a fundamental question. Since we are internally using Apache HTTP Client 4.5.3 to send the data, why do we need this sender in the first place? I think if there is anywhere still we are using HTTP Client 4.2 we should remove those. For eg ApacheSenderFactory looks for old jar on the class path and creates an instance of ApacheSender42. I think we do not need it at all. @littleaj can you take a look as a part of this improvements and trim the unnecessary code there ?

Copy link
Contributor Author

@littleaj littleaj Feb 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's something we can address separately. If it's worth exploring, we should file an issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file an issue to keep an eye on. We can then close if not needed.

final class ApacheSender42 implements ApacheSender {

private HttpClient httpClient;
private final PoolingClientConnectionManager cm;
private final HttpClient httpClient;

public ApacheSender42() {
PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
cm = new PoolingClientConnectionManager();
cm.setMaxTotal(DEFAULT_MAX_TOTAL_CONNECTIONS);
cm.setDefaultMaxPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE);

httpClient = new DefaultHttpClient(cm);

HttpParams params = httpClient.getParams();
Expand All @@ -65,6 +66,11 @@ public void dispose(HttpResponse response) {

@Override
public void close() {
try {
cm.shutdown();
} catch (Exception e) {
// chomp
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.microsoft.applicationinsights.internal.logger.InternalLogger;
Expand All @@ -47,6 +47,7 @@
* Created by gupele on 6/29/2015.
*/
public final class TransmissionPolicyManager implements Stoppable {
private static final AtomicInteger INSTANCE_ID_POOL = new AtomicInteger(1);

// The future date the the transmission is blocked
private Date suspensionDate;
Expand All @@ -61,6 +62,8 @@ public final class TransmissionPolicyManager implements Stoppable {
private final TransmissionPolicyState policyState = new TransmissionPolicyState();
private boolean throttlingIsEnabled = true;

private final int instanceId = INSTANCE_ID_POOL.getAndIncrement();

/**
* The class will be activated when a timeout expires
*/
Expand Down Expand Up @@ -149,14 +152,7 @@ private synchronized void createScheduler() {
}

threads = new ScheduledThreadPoolExecutor(1);
threads.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
threads.setThreadFactory(ThreadPoolUtils.createDaemonThreadFactory(TransmissionPolicyManager.class, instanceId));

SDKShutdownActivity.INSTANCE.register(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.Collection;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.microsoft.applicationinsights.internal.channel.TelemetriesTransmitter;
import com.microsoft.applicationinsights.internal.channel.TelemetrySerializer;
Expand Down Expand Up @@ -112,7 +112,9 @@ public void run() {
}
}

private final static int MAX_PENDING_SCHEDULE_REQUESTS = 16384;
private static final int MAX_PENDING_SCHEDULE_REQUESTS = 16384;

private static final AtomicInteger INSTANCE_ID_POOL = new AtomicInteger(1);

private final TransmissionDispatcher transmissionDispatcher;

Expand All @@ -124,6 +126,8 @@ public void run() {

private final Semaphore semaphore;

private final int instanceId = INSTANCE_ID_POOL.getAndIncrement();

public TransmitterImpl(TransmissionDispatcher transmissionDispatcher, TelemetrySerializer serializer, TransmissionsLoader transmissionsLoader) {
Preconditions.checkNotNull(transmissionDispatcher, "transmissionDispatcher must be non-null value");
Preconditions.checkNotNull(serializer, "serializer must be non-null value");
Expand All @@ -135,14 +139,7 @@ public TransmitterImpl(TransmissionDispatcher transmissionDispatcher, TelemetryS
semaphore = new Semaphore(MAX_PENDING_SCHEDULE_REQUESTS);

threadPool = new ScheduledThreadPoolExecutor(2);
threadPool.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
threadPool.setThreadFactory(ThreadPoolUtils.createDaemonThreadFactory(TransmitterImpl.class, instanceId));

this.transmissionsLoader = transmissionsLoader;
this.transmissionsLoader.load(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -232,14 +231,7 @@ public boolean isSampledIn(Telemetry telemetry) {

private void createTimerThread() {
threads = new ScheduledThreadPoolExecutor(1);
threads.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
threads.setThreadFactory(ThreadPoolUtils.createDaemonThreadFactory(AdaptiveTelemetrySampler.class));
}

private int getIntValueOrDefault(String name, String valueAsString, int defaultValue, int minValue, int maxValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ private static String createMessage(String prefix, String message, Object... arg
currentDateAsString = dateFormatter.format(new Date());
}
String formattedMessage = String.format(message, args);
String theMessage = String.format("%s %s, %d: %s", prefix, currentDateAsString, Thread.currentThread().getId(), formattedMessage);
final Thread thisThread = Thread.currentThread();
String theMessage = String.format("%s %s, %d(%s): %s", prefix, currentDateAsString, thisThread.getId(), thisThread.getName(), formattedMessage);
return theMessage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -259,14 +258,7 @@ public void run() {

private void createThreadToCollect() {
threads = new ScheduledThreadPoolExecutor(1);
threads.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
});
threads.setThreadFactory(ThreadPoolUtils.createDaemonThreadFactory(PerformanceCounterContainer.class));
}

public void setPlugin(PerformanceCountersCollectionPlugin plugin) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public void initialize() {

coordinator = new DefaultQuickPulseCoordinator(coordinatorInitData);

senderThread = new Thread(quickPulseDataSender);
senderThread = new Thread(quickPulseDataSender, QuickPulseDataSender.class.getSimpleName());
senderThread.setDaemon(true);
senderThread.start();

thread = new Thread(coordinator);
thread = new Thread(coordinator, DefaultQuickPulseCoordinator.class.getSimpleName());
thread.setDaemon(true);
thread.start();

Expand Down
Loading