Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes double-conversion when collecting into Elasticsearch #1700

Merged
merged 1 commit into from
Aug 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
import okio.Buffer;
import okio.ByteString;
import zipkin.Annotation;
import zipkin.Span;
import zipkin.internal.Nullable;
import zipkin.internal.Span2;
import zipkin.internal.Span2Converter;
import zipkin.internal.v2.codec.Encoder;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.internal.v2.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

Expand All @@ -46,13 +43,13 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
this.indexNameFormatter = es.indexNameFormatter();
}

@Override public void accept(List<Span> spans, Callback<Void> callback) {
@Override public void accept(List<Span2> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
try {
BulkSpanIndexer indexer = newBulkSpanIndexer(es);
BulkSpanIndexer indexer = new BulkSpanIndexer(es);
indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
Expand All @@ -61,19 +58,17 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
}
}

void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
void indexSpans(BulkSpanIndexer indexer, List<Span2> spans) throws IOException {
for (Span2 span : spans) {
Long spanTimestamp = span.timestamp();
long indexTimestamp = 0L; // which index to store this span into
Long spanTimestamp;
if (timestamp != null) {
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(timestamp);
if (spanTimestamp != null) {
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp);
} else {
spanTimestamp = null;
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
// the index bucket, any annotation is better than using current time.
for (int i = 0, length = span.annotations.size(); i < length; i++) {
indexTimestamp = span.annotations.get(i).timestamp / 1000;
for (int i = 0, length = span.annotations().size(); i < length; i++) {
indexTimestamp = span.annotations().get(i).timestamp / 1000;
break;
}
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
Expand All @@ -82,10 +77,6 @@ void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
}
}

BulkSpanIndexer newBulkSpanIndexer(ElasticsearchHttpStorage es) {
return new BulkSpanIndexer(es);
}

static class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final IndexNameFormatter indexNameFormatter;
Expand All @@ -95,12 +86,10 @@ static class BulkSpanIndexer {
this.indexNameFormatter = es.indexNameFormatter();
}

void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
void add(long indexTimestamp, Span2 span, @Nullable Long timestampMillis) {
String index = indexNameFormatter.formatTypeAndTimestamp(SPAN, indexTimestamp);
for (Span2 span2 : Span2Converter.fromSpan(span)) {
byte[] document = prefixWithTimestampMillisAndQuery(span2, timestampMillis);
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}
byte[] document = prefixWithTimestampMillisAndQuery(span, timestampMillis);
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}

void execute(Callback<Void> callback) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.AsyncSpan2ConsumerAdapter;
import zipkin.internal.Nullable;
import zipkin.internal.Span2Component;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
Expand All @@ -42,8 +44,7 @@
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

@AutoValue
public abstract class ElasticsearchHttpStorage implements StorageComponent {

public abstract class ElasticsearchHttpStorage extends Span2Component implements StorageComponent {
/**
* A list of elasticsearch nodes to connect to, in http://host:port or https://host:port
* format. Note this value is only read once.
Expand Down Expand Up @@ -222,6 +223,10 @@ public final Builder dateSeparator(char dateSeparator) {
}

@Override public AsyncSpanConsumer asyncSpanConsumer() {
return AsyncSpan2ConsumerAdapter.create(asyncSpan2Consumer());
}

@Override protected zipkin.internal.v2.storage.AsyncSpanConsumer asyncSpan2Consumer() {
ensureIndexTemplates();
return new ElasticsearchHttpSpanConsumer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import zipkin.Annotation;
import zipkin.BinaryAnnotation;
import zipkin.Codec;
import zipkin.Span;
import zipkin.TestObjects;
import zipkin.internal.CallbackCaptor;
import zipkin.internal.Span2;
import zipkin.internal.Span2Converter;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.Span2.Kind;
import zipkin.internal.v2.codec.Decoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.MessageEncoder;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.Constants.CLIENT_SEND;
import static zipkin.Constants.SERVER_RECV;
import static zipkin.TestObjects.TODAY;
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.UTF_8;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanConsumer.prefixWithTimestampMillisAndQuery;

Expand Down Expand Up @@ -69,7 +63,7 @@ public void close() throws IOException {
@Test public void addsTimestamp_millisIntoJson() throws Exception {
es.enqueue(new MockResponse());

Span span = Span.builder().traceId(20L).id(20L).name("get")
Span2 span = Span2.builder().traceId(20L).id(20L).name("get")
.timestamp(TODAY * 1000).build();

accept(span);
Expand All @@ -79,77 +73,69 @@ public void close() throws IOException {
}

@Test public void prefixWithTimestampMillisAndQuery_skipsWhenNoData() throws Exception {
Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L)
.addAnnotation(Annotation.create(0, CLIENT_SEND, TestObjects.WEB_ENDPOINT))
Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.kind(Kind.CLIENT)
.build();

byte[] result = prefixWithTimestampMillisAndQuery(
Span2Converter.fromSpan(span).get(0),
span.timestamp
);
byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp());

assertThat(new String(result, UTF_8))
.startsWith("{\"traceId\":\"");
}

@Test public void prefixWithTimestampMillisAndQuery_addsTimestampMillis() throws Exception {
Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(1L)
.addAnnotation(Annotation.create(1L, CLIENT_SEND, TestObjects.WEB_ENDPOINT))
Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(1L)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.kind(Kind.CLIENT)
.build();

byte[] result = prefixWithTimestampMillisAndQuery(
Span2Converter.fromSpan(span).get(0),
span.timestamp
);
byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp());

assertThat(new String(result, UTF_8))
.startsWith("{\"timestamp_millis\":1,\"traceId\":");
}

@Test public void prefixWithTimestampMillisAndQuery_addsAnnotationQuery() throws Exception {
Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L)
.addAnnotation(Annotation.create(1L, "\"foo", TestObjects.WEB_ENDPOINT))
Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.addAnnotation(1L, "\"foo")
.build();

byte[] result = prefixWithTimestampMillisAndQuery(
Span2Converter.fromSpan(span).get(0),
span.timestamp
);
byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp());

assertThat(new String(result, UTF_8))
.startsWith("{\"_q\":[\"\\\"foo\"],\"traceId");
}

@Test public void prefixWithTimestampMillisAndQuery_addsAnnotationQueryTags() throws Exception {
Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L)
.addBinaryAnnotation(BinaryAnnotation.create("\"foo", "\"bar", TestObjects.WEB_ENDPOINT))
Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.putTag("\"foo", "\"bar")
.build();

byte[] result = prefixWithTimestampMillisAndQuery(Span2Converter.fromSpan(span).get(0),
span.timestamp
);
byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp());

assertThat(new String(result, UTF_8))
.startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId");
}

@Test public void prefixWithTimestampMillisAndQuery_readable() throws Exception {
Span span = Span.builder().traceId(20L).id(20L).name("get")
Span2 span = Span2.builder().traceId(20L).id(20L).name("get")
.timestamp(TODAY * 1000).build();
Span2 span2 = Span2Converter.fromSpan(span).get(0);

byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
prefixWithTimestampMillisAndQuery(span2, span.timestamp)
prefixWithTimestampMillisAndQuery(span, span.timestamp())
));

assertThat(Decoder.JSON.decodeList(message))
.containsOnly(span2); // ignores timestamp_millis field
.containsOnly(span); // ignores timestamp_millis field
}

@Test public void doesntWriteSpanId() throws Exception {
@Test public void doesntWriteDocumentId() throws Exception {
es.enqueue(new MockResponse());

accept(TestObjects.LOTS_OF_SPANS[0]);
accept(Span2.builder().traceId(1L).id(1L).name("foo").build());

RecordedRequest request = es.takeRequest();
assertThat(request.getBody().readByteString().utf8())
Expand All @@ -159,30 +145,34 @@ public void close() throws IOException {
@Test public void writesSpanNaturallyWhenNoTimestamp() throws Exception {
es.enqueue(new MockResponse());

Span span = Span.builder().traceId(1L).id(1L).name("foo").build();
accept(span);
Span2 span = Span2.builder().traceId(1L).id(1L).name("foo").build();
accept(Span2.builder().traceId(1L).id(1L).name("foo").build());

assertThat(es.takeRequest().getBody().readByteString().utf8())
.contains("\n" + new String(Codec.JSON.writeSpan(span), UTF_8) + "\n");
.contains("\n" + new String(Encoder.JSON.encode(span), UTF_8) + "\n");
}

@Test public void traceIsSearchableBySRServiceName() throws Exception {
@Test public void traceIsSearchableByServerServiceName() throws Exception {
es.enqueue(new MockResponse());

Span clientSpan = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L)
.addAnnotation(Annotation.create(0, CLIENT_SEND, TestObjects.WEB_ENDPOINT))
Span2 clientSpan = Span2.builder().traceId(20L).id(22L).name("").parentId(21L)
.timestamp(1000L)
.kind(Kind.CLIENT)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.build();

Span serverSpan = Span.builder().traceId(20L).id(22L).name("get").parentId(21L)
.addAnnotation(Annotation.create(1000, SERVER_RECV, TestObjects.APP_ENDPOINT))
Span2 serverSpan = Span2.builder().traceId(20L).id(22L).name("get").parentId(21L)
.timestamp(2000L)
.kind(Kind.SERVER)
.localEndpoint(TestObjects.APP_ENDPOINT)
.build();

accept(serverSpan, clientSpan);

// make sure that both timestamps are in the index
assertThat(es.takeRequest().getBody().readByteString().utf8())
.contains("{\"timestamp_millis\":1")
.contains("{\"timestamp_millis\":0");
.contains("{\"timestamp_millis\":2")
.contains("{\"timestamp_millis\":1");
}

@Test public void addsPipelineId() throws Exception {
Expand All @@ -196,7 +186,7 @@ public void close() throws IOException {

es.enqueue(new MockResponse());

accept(TestObjects.TRACE.get(0));
accept(Span2.builder().traceId(1L).id(1L).name("foo").build());

RecordedRequest request = es.takeRequest();
assertThat(request.getPath())
Expand All @@ -206,17 +196,13 @@ public void close() throws IOException {
@Test public void choosesTypeSpecificIndex() throws Exception {
es.enqueue(new MockResponse());

Annotation foo = Annotation.create(
TimeUnit.DAYS.toMicros(365), // 1971-01-01
"foo",
TestObjects.APP_ENDPOINT
);

Span span = Span.builder().traceId(1L).id(2L).parentId(1L).name("s").addAnnotation(foo).build();
Span2 span = Span2.builder().traceId(1L).id(2L).parentId(1L).name("s")
.localEndpoint(TestObjects.APP_ENDPOINT)
.addAnnotation(TimeUnit.DAYS.toMicros(365) /* 1971-01-01 */, "foo")
.build();

// sanity check data
assertThat(span.timestamp).isNull();
assertThat(guessTimestamp(span)).isNull();
assertThat(span.timestamp()).isNull();

accept(span);

Expand All @@ -226,9 +212,9 @@ public void close() throws IOException {
);
}

void accept(Span... spans) throws Exception {
void accept(Span2... spans) throws Exception {
CallbackCaptor<Void> callback = new CallbackCaptor<>();
storage.asyncSpanConsumer().accept(asList(spans), callback);
storage.asyncSpan2Consumer().accept(asList(spans), callback);
callback.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class ElasticsearchHttpStorageTest {
es.enqueue(new MockResponse()); // get dependency template

// check this isn't the legacy consumer
assertThat(storage.asyncSpanConsumer())
assertThat(storage.asyncSpan2Consumer())
.isInstanceOf(ElasticsearchHttpSpanConsumer.class);
// check this isn't the double reading span store
assertThat(storage.asyncSpanStore())
Expand All @@ -101,9 +101,6 @@ public class ElasticsearchHttpStorageTest {
es.enqueue(new MockResponse()); // get span template
es.enqueue(new MockResponse()); // get dependency template

// check this isn't the legacy consumer
assertThat(storage.asyncSpanConsumer())
.isInstanceOf(ElasticsearchHttpSpanConsumer.class);
// check that we do double-reads on the legacy and new format
assertThat(storage.asyncSpanStore())
.isInstanceOf(LenientDoubleCallbackAsyncSpanStore.class);
Expand All @@ -124,9 +121,6 @@ public class ElasticsearchHttpStorageTest {
es.enqueue(new MockResponse()); // get span template
es.enqueue(new MockResponse()); // get dependency template

// check this isn't the legacy consumer
assertThat(storage.asyncSpanConsumer())
.isInstanceOf(ElasticsearchHttpSpanConsumer.class);
// check this isn't the double reading span store
assertThat(storage.asyncSpanStore())
.isInstanceOf(ElasticsearchHttpSpanStore.class);
Expand Down
Loading