Skip to content

Commit

Permalink
Deprecates Collector.acceptSpans(List<byte[]> ..
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Aug 23, 2017
1 parent 190ae60 commit bcc1ed9
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
import java.util.Collections;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;

import static zipkin.SpanDecoder.DETECTING_DECODER;
import static zipkin.SpanDecoder.THRIFT_DECODER;
import static zipkin.storage.Callback.NOOP;

/** Consumes spans from Kafka messages, ignoring malformed input */
Expand Down Expand Up @@ -48,10 +49,17 @@ public void run() {
continue;
}

if (bytes[0] == '[' /* json list */ || bytes[0] == 12 /* thrift list */) {
// If we received legacy single-span encoding, decode it into a singleton list
if (bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
try {
metrics.incrementBytes(bytes.length);
Span span = SpanDecoder.THRIFT_DECODER.readSpan(bytes);
collector.accept(Collections.singletonList(span), NOOP);
} catch (RuntimeException e) {
metrics.incrementMessagesDropped();
}
} else {
collector.acceptSpans(bytes, DETECTING_DECODER, NOOP);
} else { // assume legacy single-span encoding
collector.acceptSpans(Collections.singletonList(bytes), THRIFT_DECODER, NOOP);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;

import static zipkin.SpanDecoder.DETECTING_DECODER;
import static zipkin.SpanDecoder.THRIFT_DECODER;
import static zipkin.storage.Callback.NOOP;

/** Consumes spans from Kafka messages, ignoring malformed input */
Expand Down Expand Up @@ -75,10 +76,17 @@ public void run() {
if (bytes.length == 0) {
metrics.incrementMessagesDropped();
} else {
if (bytes[0] == '[' /* json list */ || bytes[0] == 12 /* thrift list */) {
// If we received legacy single-span encoding, decode it into a singleton list
if (bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
metrics.incrementBytes(bytes.length);
try {
Span span = SpanDecoder.THRIFT_DECODER.readSpan(bytes);
collector.accept(Collections.singletonList(span), NOOP);
} catch (RuntimeException e) {
metrics.incrementMessagesDropped();
}
} else {
collector.acceptSpans(bytes, DETECTING_DECODER, NOOP);
} else { // assume legacy single-span encoding
collector.acceptSpans(Collections.singletonList(bytes), THRIFT_DECODER, NOOP);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,7 +20,8 @@
import java.util.Base64;
import java.util.List;
import java.util.stream.Collectors;
import zipkin.Codec;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.internal.Nullable;
Expand All @@ -40,20 +41,22 @@ public ScribeSpanConsumer(ScribeCollector.Builder builder) {
@Override
public ListenableFuture<ResultCode> log(List<LogEntry> messages) {
metrics.incrementMessages();
List<byte[]> thrifts;
List<Span> spans;
try {
thrifts = messages.stream()
spans = messages.stream()
.filter(m -> m.category.equals(category))
.map(m -> m.message.getBytes(StandardCharsets.ISO_8859_1))
.map(b -> Base64.getMimeDecoder().decode(b)) // finagle-zipkin uses mime encoding
.peek(b -> metrics.incrementBytes(b.length))
.map(SpanDecoder.THRIFT_DECODER::readSpan)
.collect(Collectors.toList());
} catch (RuntimeException e) {
metrics.incrementMessagesDropped();
return Futures.immediateFailedFuture(e);
}

SettableFuture<ResultCode> result = SettableFuture.create();
collector.acceptSpans(thrifts, Codec.THRIFT, new Callback<Void>() {
collector.accept(spans, new Callback<Void>() {
@Override public void onSuccess(@Nullable Void value) {
result.set(ResultCode.OK);
}
Expand Down
123 changes: 20 additions & 103 deletions zipkin/src/main/java/zipkin/collector/Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
package zipkin.collector;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.internal.DetectingSpanDecoder;
import zipkin.storage.Callback;
import zipkin.storage.StorageComponent;

import static java.lang.String.format;
import static java.util.logging.Level.WARNING;
import static zipkin.internal.DetectingSpanDecoder.detectFormat;
import static zipkin.internal.Util.checkNotNull;

/**
Expand All @@ -34,7 +33,7 @@
* before storage is attempted. This ensures that calling threads are disconnected from storage
* threads.
*/
public class Collector { // not final for mocking
public class Collector extends zipkin.internal.Collector<SpanDecoder, Span> { // not final for mock

/** Needed to scope this to the correct logging category */
public static Builder builder(Class<?> loggingClass) {
Expand Down Expand Up @@ -74,31 +73,31 @@ public Collector build() {
}
}

final Logger logger;
final StorageComponent storage;
final CollectorSampler sampler;
final CollectorMetrics metrics;
final StorageComponent storage;

Collector(Builder builder) {
this.logger = checkNotNull(builder.logger, "logger");
super(builder.logger, builder.metrics);
this.storage = checkNotNull(builder.storage, "storage");
this.sampler = builder.sampler == null ? CollectorSampler.ALWAYS_SAMPLE : builder.sampler;
this.metrics = builder.metrics == null ? CollectorMetrics.NOOP_METRICS : builder.metrics;
}

@Override
public void acceptSpans(byte[] serializedSpans, SpanDecoder decoder, Callback<Void> callback) {
metrics.incrementBytes(serializedSpans.length);
List<Span> spans;
try {
spans = decoder.readSpans(serializedSpans);
if (decoder instanceof DetectingSpanDecoder) decoder = detectFormat(serializedSpans);
} catch (RuntimeException e) {
metrics.incrementBytes(serializedSpans.length);
callback.onError(errorReading(e));
return;
}
accept(spans, callback);
super.acceptSpans(serializedSpans, decoder, callback);
}

public void acceptSpans(List<byte[]> serializedSpans, SpanDecoder decoder,
/**
* @deprecated All transports accept encoded lists of spans. Please update reporters to do so.
*/
@Deprecated public void acceptSpans(List<byte[]> serializedSpans, SpanDecoder decoder,
Callback<Void> callback) {
List<Span> spans = new ArrayList<>(serializedSpans.size());
try {
Expand All @@ -115,101 +114,19 @@ public void acceptSpans(List<byte[]> serializedSpans, SpanDecoder decoder,
accept(spans, callback);
}

public void accept(List<Span> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
metrics.incrementSpans(spans.size());

List<Span> sampled = sample(spans);
if (sampled.isEmpty()) {
callback.onSuccess(null);
return;
}

try {
storage.asyncSpanConsumer().accept(sampled, acceptSpansCallback(sampled));
callback.onSuccess(null);
} catch (RuntimeException e) {
callback.onError(errorStoringSpans(sampled, e));
return;
}
@Override protected List<Span> decodeList(SpanDecoder decoder, byte[] serialized) {
return decoder.readSpans(serialized);
}

List<Span> sample(List<Span> input) {
List<Span> sampled = new ArrayList<>(input.size());
for (Span s : input) {
if (sampler.isSampled(s.traceId, s.debug)) sampled.add(s);
}
int dropped = input.size() - sampled.size();
if (dropped > 0) metrics.incrementSpansDropped(dropped);
return sampled;
@Override protected boolean isSampled(Span span) {
return sampler.isSampled(span.traceId, span.debug);
}

Callback<Void> acceptSpansCallback(final List<Span> spans) {
return new Callback<Void>() {
@Override public void onSuccess(Void value) {
}

@Override public void onError(Throwable t) {
errorStoringSpans(spans, t);
}

@Override
public String toString() {
return appendSpanIds(spans, new StringBuilder("AcceptSpans(")).append(")").toString();
}
};
}

RuntimeException errorReading(Throwable e) {
return errorReading("Cannot decode spans", e);
}

RuntimeException errorReading(String message, Throwable e) {
metrics.incrementMessagesDropped();
return doError(message, e);
}

/**
* When storing spans, an exception can be raised before or after the fact. This adds context of
* span ids to give logs more relevance.
*/
RuntimeException errorStoringSpans(List<Span> spans, Throwable e) {
metrics.incrementSpansDropped(spans.size());
// The exception could be related to a span being huge. Instead of filling logs,
// print trace id, span id pairs
StringBuilder msg = appendSpanIds(spans, new StringBuilder("Cannot store spans "));
return doError(msg.toString(), e);
}

RuntimeException doError(String message, Throwable e) {
String exceptionMessage = e.getMessage() != null ? e.getMessage() : "";
if (e instanceof RuntimeException && exceptionMessage.startsWith("Malformed")) {
warn(exceptionMessage, e);
return (RuntimeException) e;
} else {
message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), exceptionMessage);
warn(message, e);
return new RuntimeException(message, e);
}
}

void warn(String message, Throwable e) {
logger.log(WARNING, message, e);
}

StringBuilder appendSpanIds(List<Span> spans, StringBuilder message) {
message.append("[");
for (Iterator<Span> iterator = spans.iterator(); iterator.hasNext(); ) {
message.append(idString(iterator.next()));
if (iterator.hasNext()) message.append(", ");
}
return message.append("]");
@Override protected void record(List<Span> sampled, Callback<Void> callback) {
storage.asyncSpanConsumer().accept(sampled, callback);
}

String idString(Span span) {
@Override protected String idString(Span span) {
return span.idString();
}
}
Loading

0 comments on commit bcc1ed9

Please sign in to comment.