Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn off Statsbeat when proxy is used or any exception from the server #2221

Merged
merged 17 commits into from
Apr 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,40 @@
// e.g. sending telemetry to the portal, storing telemetry to disk, ...
public class OperationLogger {

private final AggregatingLogger aggregatingLogger;
public static final OperationLogger NOOP = new OperationLogger(null);

@Nullable private final AggregatingLogger aggregatingLogger;

public OperationLogger(Class<?> source, String operation) {
this(source, operation, 300);
}

// visible for testing
OperationLogger(Class<?> source, String operation, int intervalSeconds) {
aggregatingLogger = new AggregatingLogger(source, operation, true, intervalSeconds);
this(new AggregatingLogger(source, operation, true, intervalSeconds));
}

private OperationLogger(@Nullable AggregatingLogger aggregatingLogger) {
this.aggregatingLogger = aggregatingLogger;
}

public void recordSuccess() {
aggregatingLogger.recordSuccess();
if (aggregatingLogger != null) {
aggregatingLogger.recordSuccess();
}
}

// failureMessage should have low cardinality
public void recordFailure(String failureMessage) {
aggregatingLogger.recordWarning(failureMessage);
if (aggregatingLogger != null) {
aggregatingLogger.recordWarning(failureMessage);
}
}

// failureMessage should have low cardinality
public void recordFailure(String failureMessage, @Nullable Throwable exception) {
aggregatingLogger.recordWarning(failureMessage, exception);
if (aggregatingLogger != null) {
aggregatingLogger.recordWarning(failureMessage, exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@
import java.io.IOException;
import java.io.StringWriter;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
Expand All @@ -74,18 +74,18 @@ public class TelemetryChannel {

private static final AppInsightsByteBufferPool byteBufferPool = new AppInsightsByteBufferPool();

// TODO (heya) should we suppress logging statsbeat telemetry ingestion issues?
private static final OperationLogger operationLogger =
new OperationLogger(TelemetryChannel.class, "Sending telemetry to the ingestion service");

private static final OperationLogger retryOperationLogger =
new OperationLogger(
TelemetryChannel.class, "Sending telemetry to the ingestion service (retry)");
private final OperationLogger operationLogger;
private final OperationLogger retryOperationLogger;

// TODO (kryalama) do we still need this AtomicBoolean, or can we use throttling built in to the
// operationLogger?
private final AtomicBoolean friendlyExceptionThrown = new AtomicBoolean();

private final AtomicInteger statsbeatUnableToReachBreezeCounter = new AtomicInteger();
trask marked this conversation as resolved.
Show resolved Hide resolved
private final AtomicBoolean statsbeatHasBeenShutdown = new AtomicBoolean();
trask marked this conversation as resolved.
Show resolved Hide resolved

private volatile boolean statsbeatHasReachedBreezeAtLeastOnce;
trask marked this conversation as resolved.
Show resolved Hide resolved

@SuppressWarnings("CatchAndPrintStackTrace")
private static ObjectMapper createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
Expand Down Expand Up @@ -121,6 +121,10 @@ public CompletableResultCode sendRawBytes(
String instrumentationKey,
Runnable onSuccess,
Consumer<Boolean> onFailure) {
if (isStatsbeat && statsbeatHasBeenShutdown.get()) {
trask marked this conversation as resolved.
Show resolved Hide resolved
// let it be deleted from disk so that it won't keep getting retried
return CompletableResultCode.ofSuccess();
}
return internalSend(
singletonList(buffer), instrumentationKey, onSuccess, onFailure, retryOperationLogger);
}
Expand All @@ -137,6 +141,18 @@ public TelemetryChannel(
this.localFileWriter = localFileWriter;
this.statsbeatModule = statsbeatModule;
this.isStatsbeat = isStatsbeat;

if (isStatsbeat) {
// suppress all logging for statsbeat telemetry failures
operationLogger = OperationLogger.NOOP;
retryOperationLogger = OperationLogger.NOOP;
} else {
operationLogger =
new OperationLogger(TelemetryChannel.class, "Sending telemetry to the ingestion service");
retryOperationLogger =
new OperationLogger(
TelemetryChannel.class, "Sending telemetry to the ingestion service (retry)");
}
}

public CompletableResultCode send(List<TelemetryItem> telemetryItems) {
Expand Down Expand Up @@ -184,7 +200,7 @@ public CompletableResultCode internalSendByInstrumentationKey(

List<ByteBuffer> encode(List<TelemetryItem> telemetryItems) throws IOException {

if (logger.isDebugEnabled()) {
if (!isStatsbeat && logger.isDebugEnabled()) {
StringWriter debug = new StringWriter();
try (JsonGenerator jg = mapper.createGenerator(debug)) {
writeTelemetryItems(jg, telemetryItems);
Expand Down Expand Up @@ -282,6 +298,11 @@ private CompletableResultCode internalSend(
return result;
}

// not including 401/403/503 in this list because those are commonly returned by proxy servers
// when they are not configured to allow traffic for westus-0
private static final Set<Integer> RESPONSE_CODES_INDICATING_REACHED_BREEZE =
new HashSet<>(asList(200, 206, 402, 408, 429, 439, 500));
trask marked this conversation as resolved.
Show resolved Hide resolved
trask marked this conversation as resolved.
Show resolved Hide resolved

private Consumer<HttpResponse> responseHandler(
String instrumentationKey,
long startTime,
Expand All @@ -296,6 +317,13 @@ private Consumer<HttpResponse> responseHandler(
.subscribe(
body -> {
int statusCode = response.getStatusCode();
if (isStatsbeat && !statsbeatHasReachedBreezeAtLeastOnce) {
if (RESPONSE_CODES_INDICATING_REACHED_BREEZE.contains(statusCode)) {
statsbeatHasReachedBreezeAtLeastOnce = true;
} else {
statsbeatDidNotReachBreeze();
}
}
switch (statusCode) {
case 200: // SUCCESS
operationLogger.recordSuccess();
Expand Down Expand Up @@ -364,19 +392,13 @@ private Consumer<Throwable> errorHandler(
String instrumentationKey, Consumer<Boolean> onFailure, OperationLogger operationLogger) {

return error -> {
if (isStatsbeat
&& (error instanceof UnknownHostException
|| error instanceof UnresolvedAddressException)) {
// when sending a Statsbeat request and server returns an UnknownHostException, it's
// likely that it's using AMPLS. In that case, we use the kill-switch to turn off Statsbeat.
statsbeatModule.shutdown();
onFailure.accept(false);
return;
if (isStatsbeat && !statsbeatHasReachedBreezeAtLeastOnce) {
statsbeatDidNotReachBreeze();
}

// TODO (trask) only log one-time friendly exception if no prior successes
if (!NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException(
error, endpointUrl.toString(), friendlyExceptionThrown, logger)) {
if (!isStatsbeat
&& !NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException(
error, endpointUrl.toString(), friendlyExceptionThrown, logger)) {
operationLogger.recordFailure(
"Error sending telemetry items: " + error.getMessage(), error);
}
Expand All @@ -389,6 +411,21 @@ private Consumer<Throwable> errorHandler(
};
}

private void statsbeatDidNotReachBreeze() {
if (statsbeatUnableToReachBreezeCounter.getAndIncrement() >= 10
&& !statsbeatHasBeenShutdown.getAndSet(true)) {
trask marked this conversation as resolved.
Show resolved Hide resolved
// shutting down statsbeat because it's unlikely that it will ever get through at this point
// some possible reasons:
// * AMPLS
// * proxy that has not been configured to allow westus-0
// * local firewall that has not been configured to allow westus-0
//
// TODO need to figure out a way that statsbeat telemetry can be sent to the same endpoint as
// the customer data for these cases
statsbeatModule.shutdown();
}
}

private static String getErrorMessageFromPartialSuccessResponse(String body) {
JsonNode jsonNode;
try {
Expand Down