Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error messages for network connectivity and memory issue #1483

Merged
merged 20 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@

package com.microsoft.applicationinsights.internal.channel.common;

import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputAsync;
import com.microsoft.applicationinsights.internal.channel.TransmissionOutputSync;
import com.microsoft.applicationinsights.internal.heartbeat.HeartBeatProvider;
import com.microsoft.applicationinsights.internal.util.NetworkExceptionsTracker;
import com.microsoft.applicationinsights.internal.util.ThreadPoolUtils;

/**
Expand All @@ -46,6 +50,8 @@ public final class ActiveTransmissionNetworkOutput implements TransmissionOutput
private final TransmissionOutputSync actualOutput;
private final TransmissionPolicyStateFetcher transmissionPolicy;
private final int instanceId = INTSTANCE_ID_POOL.getAndIncrement();
private final ScheduledExecutorService networkIssueTracker =
Executors.newSingleThreadScheduledExecutor(ThreadPoolUtils.createDaemonThreadFactory(NetworkExceptionsTracker.class, "networkIssueTracker"));

public ActiveTransmissionNetworkOutput(TransmissionOutputSync actualOutput, TransmissionPolicyStateFetcher transmissionPolicy) {
this(actualOutput, transmissionPolicy, DEFAULT_MAX_MESSAGES_IN_BUFFER);
Expand All @@ -56,6 +62,8 @@ public ActiveTransmissionNetworkOutput(TransmissionOutputSync actualOutput, Tran

this.actualOutput = actualOutput;
this.transmissionPolicy = transmissionPolicy;
// Schedule to run every 5 minutes
this.networkIssueTracker.scheduleAtFixedRate(new NetworkExceptionsTracker(), 0,300, TimeUnit.SECONDS);
kryalama marked this conversation as resolved.
Show resolved Hide resolved

maxThreads = DEFAULT_MAX_NUMBER_OF_THREADS;
outputThreads = ThreadPoolUtils.newLimitedThreadPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* The class is responsible for the actual sending of
Expand Down Expand Up @@ -77,6 +80,12 @@ public final class TransmissionNetworkOutput implements TransmissionOutputSync {

private TransmissionPolicyManager transmissionPolicyManager;

public static final AtomicLong successCounter = new AtomicLong(0);
public static final AtomicLong failureCounter = new AtomicLong(0);
public static final AtomicLong previousFailureCounter = new AtomicLong(0);
public static final AtomicLong previousSuccessCounter = new AtomicLong(0);
public static final AtomicBoolean firstFailure = new AtomicBoolean(false);

/**
* Creates an instance of the network transmission class.
*
Expand Down Expand Up @@ -174,28 +183,32 @@ public boolean sendSync(Transmission transmission) {
// to be throttled
transmissionPolicyManager.clearBackoff();
}
successCounter.incrementAndGet();
return true;

} catch (ConnectionPoolTimeoutException e) {
ex = e;
trask marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Failed to send, connection pool timeout exception", e);
handleTemporaryExceptions("Failed to send, connection pool timeout exception. " +
"Telemetry will be stored locally and re-sent later once the connection is stable again", e);
} catch (SocketException e) {
ex = e;
logger.error("Failed to send, socket exception", e);
handleTemporaryExceptions("Failed to send, socket exception. " +
"Telemetry will be stored locally and re-sent later once the connection is stable again", e);
} catch (SocketTimeoutException e) {
handleTemporaryExceptions("Failed to send, socket timeout exception. " +
"Telemetry will be stored locally and re-sent later once the connection is stable again", e);
} catch (UnknownHostException e) {
ex = e;
logger.error("Failed to send, wrong host address or cannot reach address due to network issues", e);
} catch (IOException ioe) {
ex = ioe;
logger.error("Failed to send", ioe);
handleTemporaryExceptions("Failed to send, wrong host address or cannot reach address due to network issues. " +
"Telemetry will be stored locally and re-sent later once the connection is stable again", e);
} catch (IOException e) {
handleTemporaryExceptions("Failed to send, IO exception. " +
"Telemetry will be stored locally and re-sent later once the connection is stable again", e);
} catch (FriendlyException e) {
ex = e;
if(!friendlyExceptionThrown.getAndSet(true)) {
logger.error(e.getMessage());
}
} catch (Exception e) {
ex = e;
logger.error("Failed to send, unexpected exception", e);
handleTemporaryExceptions("Failed to send, unexpected exception. " +
"Telemetry will be stored locally and re-sent later once the connection is stable again", e);
} catch (ThreadDeath td) {
throw td;
} catch (Throwable t) {
Expand Down Expand Up @@ -237,6 +250,24 @@ public boolean sendSync(Transmission transmission) {
return true;
}

private static void handleTemporaryExceptions(String message, Exception ex) {
//Handle first failure
if(!firstFailure.getAndSet(true)) {
kryalama marked this conversation as resolved.
Show resolved Hide resolved
logger.error(message+"\n"+
"Total number of successful telemetry requests so far:"+successCounter.get()+"\n"+
"Future failures will be aggregated and logged once every 5 minutes\n",
ex
);
//Log the first failure every 5 minutes.
} else if(failureCounter.getAndIncrement() == 0) {
logger.error(message+"\n"+
"Total number of failed telemetry requests in the last 5 minutes:"+previousFailureCounter.get()+"\n"+
"Total number of successful telemetry requests in the last 5 minutes:"+previousFailureCounter.get()+"\n"+
kryalama marked this conversation as resolved.
Show resolved Hide resolved
ex
);
}
kryalama marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Generates the HTTP POST to send to the endpoint.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.microsoft.applicationinsights.internal.util;

import static com.microsoft.applicationinsights.internal.channel.common.TransmissionNetworkOutput.failureCounter;
import static com.microsoft.applicationinsights.internal.channel.common.TransmissionNetworkOutput.previousFailureCounter;
import static com.microsoft.applicationinsights.internal.channel.common.TransmissionNetworkOutput.previousSuccessCounter;
import static com.microsoft.applicationinsights.internal.channel.common.TransmissionNetworkOutput.successCounter;

public class NetworkExceptionsTracker implements Runnable{
@Override public void run() {
if(failureCounter.get() > 0) {
previousFailureCounter.set(failureCounter.getAndSet(0));
previousSuccessCounter.set(successCounter.getAndSet(0));
trask marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.microsoft.applicationinsights.internal.util;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.junit.*;

import static com.microsoft.applicationinsights.internal.channel.common.TransmissionNetworkOutput.failureCounter;
import static com.microsoft.applicationinsights.internal.channel.common.TransmissionNetworkOutput.previousFailureCounter;
import static com.microsoft.applicationinsights.internal.channel.common.TransmissionNetworkOutput.previousSuccessCounter;
import static com.microsoft.applicationinsights.internal.channel.common.TransmissionNetworkOutput.successCounter;
import static org.junit.Assert.*;

public class NextworkExceptionsTrackerTest {
private static final ScheduledExecutorService networkIssueTrackerTest =
Executors.newSingleThreadScheduledExecutor(
ThreadPoolUtils.createDaemonThreadFactory(
NetworkExceptionsTracker.class,
"networkIssueTrackerTest"));

@Before
public void setUp() {
// one-time initialization code
networkIssueTrackerTest.scheduleAtFixedRate(new NetworkExceptionsTracker(), 0, 1, TimeUnit.MILLISECONDS);
}

@After
public void tearDown() throws InterruptedException {
// one-time cleanup code
if (!networkIssueTrackerTest.isShutdown()) {
networkIssueTrackerTest.shutdown();
}
networkIssueTrackerTest.awaitTermination(2, TimeUnit.SECONDS);
}


@Test
public void testPreviousCountersAreAssigned() throws InterruptedException {
Timer timer = new Timer("Timer");
TimerTask incrementFailureTask = new TimerTask() {
public void run() {
try {
failureCounter.incrementAndGet();
Thread.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
TimerTask incrementSuccessTask = new TimerTask() {
public void run() {
try {
Thread.sleep(0);
successCounter.incrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
};
Thread failureThread = new Thread(incrementFailureTask);
Thread successThread = new Thread(incrementSuccessTask);
failureThread.start();
successThread.start();
assertEquals(0, previousFailureCounter.get());
assertEquals(0, previousSuccessCounter.get());
failureThread.join();
successThread.join();
assertEquals(1, previousFailureCounter.get());
assertEquals(1, previousSuccessCounter.get());
assertEquals(0, successCounter.get());
assertEquals(0, failureCounter.get());
}
}