Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log ingestion error on 206 response #2065

Merged
merged 11 commits into from
Jan 25, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ PersistedFile loadTelemetriesFromDisk() {
byte[] ikeyBytes = new byte[36];
int rawByteLength = (int) tempFile.length() - 36;
byte[] telemetryBytes = new byte[rawByteLength];
String instrumentationKey = null;
String instrumentationKey;
try (FileInputStream fileInputStream = new FileInputStream(tempFile)) {
readFully(fileInputStream, ikeyBytes, 36);
instrumentationKey = new String(ikeyBytes, UTF_8);
Expand Down Expand Up @@ -163,13 +163,13 @@ private static void readFully(FileInputStream fileInputStream, byte[] byteArray,

// either delete it permanently on success or add it back to cache to be processed again later on
// failure
public void updateProcessedFileStatus(boolean success, File file) {
public void updateProcessedFileStatus(boolean successOrNonRetryableError, File file) {
if (!file.exists()) {
// not sure why this would happen
updateOperationLogger.recordFailure("File no longer exists: " + file.getName());
return;
}
if (success) {
if (successOrNonRetryableError) {
// delete a file on the queue permanently when http response returns success.
if (!LocalStorageUtils.deleteFileWithRetries(file)) {
// TODO (heya) track file deletion failure via Statsbeat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ public void run() {
LocalFileLoader.PersistedFile persistedFile = localFileLoader.loadTelemetriesFromDisk();
if (persistedFile != null) {
CompletableResultCode resultCode =
telemetryChannel.sendRawBytes(persistedFile.rawBytes, persistedFile.instrumentationKey);
telemetryChannel.sendRawBytes(
persistedFile.rawBytes,
persistedFile.instrumentationKey,
() -> localFileLoader.updateProcessedFileStatus(true, persistedFile.file),
retryable ->
localFileLoader.updateProcessedFileStatus(!retryable, persistedFile.file));
resultCode.join(30, TimeUnit.SECONDS); // wait max 30 seconds for request to be completed.
localFileLoader.updateProcessedFileStatus(resultCode.isSuccess(), persistedFile.file);
}
} catch (RuntimeException ex) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.util.Context;
import com.azure.core.util.tracing.Tracer;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.microsoft.applicationinsights.agent.internal.common.NetworkFriendlyExceptions;
Expand All @@ -53,11 +56,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// TODO performance testing
public class TelemetryChannel {
Expand All @@ -68,14 +73,17 @@ public class TelemetryChannel {

private static final AppInsightsByteBufferPool byteBufferPool = new AppInsightsByteBufferPool();

// TODO (heya) should we suppress logging statsbeat telemetry ingestion issues?
private static final OperationLogger operationLogger =
new OperationLogger(TelemetryChannel.class, "Sending telemetry to the ingestion service");

private static final OperationLogger retryOperationLogger =
new OperationLogger(
TelemetryChannel.class,
"Sending telemetry to the ingestion service (telemetry will be stored to disk on failure and retried later)");
TelemetryChannel.class, "Sending telemetry to the ingestion service (retry)");

// TODO (kryalama) do we still need this AtomicBoolean, or can we use throttling built in to the
// operationLogger?
private static final AtomicBoolean friendlyExceptionThrown = new AtomicBoolean();
private final AtomicBoolean friendlyExceptionThrown = new AtomicBoolean();

@SuppressWarnings("CatchAndPrintStackTrace")
private static ObjectMapper createObjectMapper() {
Expand All @@ -91,7 +99,7 @@ private static ObjectMapper createObjectMapper() {

private final HttpPipeline pipeline;
private final URL endpointUrl;
@Nullable private final LocalFileWriter localFileWriter;
private final LocalFileWriter localFileWriter;
private final StatsbeatModule statsbeatModule;
private final boolean isStatsbeat;

Expand All @@ -107,8 +115,13 @@ public static TelemetryChannel create(
httpPipeline, endpointUrl, localFileWriter, statsbeatModule, isStatsbeat);
}

public CompletableResultCode sendRawBytes(ByteBuffer buffer, String instrumentationKey) {
return internalSend(singletonList(buffer), instrumentationKey, true);
public CompletableResultCode sendRawBytes(
ByteBuffer buffer,
String instrumentationKey,
Runnable onSuccess,
Consumer<Boolean> onFailure) {
return internalSend(
singletonList(buffer), instrumentationKey, onSuccess, onFailure, retryOperationLogger);
}

// used by tests only
Expand Down Expand Up @@ -153,7 +166,15 @@ public CompletableResultCode internalSendByInstrumentationKey(
return CompletableResultCode.ofFailure();
}
try {
return internalSend(byteBuffers, instrumentationKey, false);
return internalSend(
byteBuffers,
instrumentationKey,
() -> byteBufferPool.offer(byteBuffers),
retryable -> {
localFileWriter.writeToDisk(byteBuffers, instrumentationKey);
byteBufferPool.offer(byteBuffers);
},
operationLogger);
} catch (Throwable t) {
operationLogger.recordFailure("Error sending telemetry items: " + t.getMessage(), t);
return CompletableResultCode.ofFailure();
Expand Down Expand Up @@ -201,7 +222,11 @@ private static void writeTelemetryItems(JsonGenerator jg, List<TelemetryItem> te
* sent as {@code List<ByteBuffer>}. Persisted telemetries will be sent as byte[]
*/
private CompletableResultCode internalSend(
List<ByteBuffer> byteBuffers, String instrumentationKey, boolean persisted) {
List<ByteBuffer> byteBuffers,
String instrumentationKey,
Runnable onSuccess,
Consumer<Boolean> onFailure,
OperationLogger operationLogger) {
HttpRequest request = new HttpRequest(HttpMethod.POST, endpointUrl);

request.setBody(Flux.fromIterable(byteBuffers));
Expand Down Expand Up @@ -234,115 +259,141 @@ private CompletableResultCode internalSend(
pipeline
.send(request, Context.of(contextKeyValues))
.subscribe(
response -> {
parseResponseCode(
response.getStatusCode(), instrumentationKey, byteBuffers, persisted);
LazyHttpClient.consumeResponseBody(response);
if (!isStatsbeat) {
if (response.getStatusCode() == 200) {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestSuccessCount(
System.currentTimeMillis() - startTime, instrumentationKey);
} else {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestFailureCount(instrumentationKey);
}
}
if (!persisted) {
// persisted byte buffers don't come from the pool so shouldn't go back to the pool
byteBufferPool.offer(byteBuffers);
}
if (response.getStatusCode() == 200) {
result.succeed();
} else {
result.fail();
}
},
error -> {
// AMPLS
if (isStatsbeat && error instanceof UnknownHostException) {
// when sending a Statsbeat request and server returns an UnknownHostException, it's
// likely that
// it's using a virtual network. In that case, we use the kill-switch to turn off
// Statsbeat.
statsbeatModule.shutdown();
} else {
if (!NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException(
error, endpointUrl.toString(), friendlyExceptionThrown, logger)) {
operationLogger.recordFailure(
"Error sending telemetry items: " + error.getMessage(), error);
}

if (!isStatsbeat) {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestFailureCount(instrumentationKey);
}
// no need to write to disk again when failing to send raw bytes from the persisted
// file
if (!persisted) {
writeToDiskOnFailure(byteBuffers, instrumentationKey);
}
}

if (!persisted) {
// persisted byte buffers don't come from the pool so shouldn't go back to the pool
byteBufferPool.offer(byteBuffers);
}
result.fail();
});
responseHandler(
instrumentationKey,
startTime,
() -> {
onSuccess.run();
result.succeed();
},
retryable -> {
onFailure.accept(retryable);
result.fail();
},
operationLogger),
errorHandler(
instrumentationKey,
retryable -> {
onFailure.accept(retryable);
result.fail();
},
operationLogger));
return result;
}

private void writeToDiskOnFailure(List<ByteBuffer> byteBuffers, String instrumentationKey) {
if (localFileWriter != null) {
localFileWriter.writeToDisk(byteBuffers, instrumentationKey);
private Consumer<HttpResponse> responseHandler(
String instrumentationKey,
long startTime,
Runnable onSuccess,
Consumer<Boolean> onFailure,
OperationLogger operationLogger) {

return response ->
response
.getBodyAsString()
.switchIfEmpty(Mono.just(""))
.subscribe(
body -> {
int statusCode = response.getStatusCode();
switch (statusCode) {
case 200: // SUCCESS
operationLogger.recordSuccess();
onSuccess.run();
break;
case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS
operationLogger.recordFailure(
getErrorMessageFromPartialSuccessResponse(body));
onFailure.accept(false);
break;
case 401: // breeze returns if aad enabled and no authentication token provided
case 403: // breeze returns if aad enabled or disabled (both cases) and
// wrong/expired credentials provided
case 408: // REQUEST TIMEOUT
case 429: // TOO MANY REQUESTS
case 500: // INTERNAL SERVER ERROR
case 503: // SERVICE UNAVAILABLE
operationLogger.recordFailure(
"received response code "
+ statusCode
+ " (telemetry will be stored to disk and retried later)");
onFailure.accept(true);
break;
Comment on lines +311 to +320
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the list of response codes that 2.x used to retry on, I think it's a good list, not sure when we diverged from it

case 439: // Breeze-specific: THROTTLED OVER EXTENDED TIME
// TODO handle throttling
operationLogger.recordFailure(
"received response code 439 (throttled over extended time)");
onFailure.accept(false);
break;
default:
operationLogger.recordFailure("received response code: " + statusCode);
onFailure.accept(false);
}
if (!isStatsbeat) {
handleStatsbeatOnResponse(instrumentationKey, startTime, statusCode);
}
},
exception -> {
operationLogger.recordFailure("exception retrieving response body", exception);
onFailure.accept(false);
});
}

private void handleStatsbeatOnResponse(
String instrumentationKey, long startTime, int statusCode) {
if (statusCode == 200) {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestSuccessCount(System.currentTimeMillis() - startTime, instrumentationKey);
} else {
statsbeatModule.getNetworkStatsbeat().incrementRequestFailureCount(instrumentationKey);
}
if (statusCode == 439) {
statsbeatModule.getNetworkStatsbeat().incrementThrottlingCount(instrumentationKey);
}
}

private void parseResponseCode(
int statusCode, String instrumentationKey, List<ByteBuffer> byteBuffers, boolean persisted) {
switch (statusCode) {
case 401: // UNAUTHORIZED
case 403: // FORBIDDEN
logger.warn(
"Failed to send telemetry with status code:{}, please check your credentials",
statusCode);
// no need to write to disk again when failing to send raw bytes from the persisted file
if (!persisted) {
writeToDiskOnFailure(byteBuffers, instrumentationKey);
trask marked this conversation as resolved.
Show resolved Hide resolved
}
break;
case 408: // REQUEST TIMEOUT
case 500: // INTERNAL SERVER ERROR
case 503: // SERVICE UNAVAILABLE
case 429: // TOO MANY REQUESTS
case 439: // Breeze-specific: THROTTLED OVER EXTENDED TIME
// TODO handle throttling
// TODO (heya) track throttling count via Statsbeat
// instrumentationKey is null when sending persisted file's raw bytes.
if (!isStatsbeat) {
statsbeatModule.getNetworkStatsbeat().incrementThrottlingCount(instrumentationKey);
}
break;
case 200: // SUCCESS
operationLogger.recordSuccess();
break;
case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS
// TODO handle partial success
break;
case 0: // client-side exception
// TODO exponential backoff and retry to a limit
// TODO (heya) track failure count via Statsbeat
// instrumentationKey is null when sending persisted file's raw bytes.
if (!isStatsbeat) {
statsbeatModule.getNetworkStatsbeat().incrementRetryCount(instrumentationKey);
}
break;
default:
// ok
private Consumer<Throwable> errorHandler(
String instrumentationKey, Consumer<Boolean> onFailure, OperationLogger operationLogger) {

return error -> {
if (isStatsbeat && error instanceof UnknownHostException) {
// when sending a Statsbeat request and server returns an UnknownHostException, it's
// likely that it's using AMPLS. In that case, we use the kill-switch to turn off Statsbeat.
statsbeatModule.shutdown();
onFailure.accept(false);
return;
}

// TODO (trask) only log one-time friendly exception if no prior successes
if (!NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException(
error, endpointUrl.toString(), friendlyExceptionThrown, logger)) {
operationLogger.recordFailure(
"Error sending telemetry items: " + error.getMessage(), error);
}

if (!isStatsbeat) {
statsbeatModule.getNetworkStatsbeat().incrementRequestFailureCount(instrumentationKey);
}

onFailure.accept(true);
};
}

private static String getErrorMessageFromPartialSuccessResponse(String body) {
trask marked this conversation as resolved.
Show resolved Hide resolved
JsonNode jsonNode;
try {
jsonNode = new ObjectMapper().readTree(body);
} catch (JsonProcessingException e) {
return "ingestion service returned 206, but could not parse response as json: " + body;
}
List<JsonNode> errors = new ArrayList<>();
jsonNode.get("errors").forEach(errors::add);
StringBuilder message = new StringBuilder();
message.append(errors.get(0).get("message").asText());
int moreErrors = errors.size() - 1;
if (moreErrors > 0) {
message.append(" (and ").append(moreErrors).append(" more)");
}
return message.toString();
}
}
Loading