diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java index 6dcdfbd6e10..199ac9731bf 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java @@ -23,15 +23,12 @@ import okio.Buffer; import okio.ByteString; import zipkin.Annotation; -import zipkin.Span; import zipkin.internal.Nullable; import zipkin.internal.Span2; -import zipkin.internal.Span2Converter; import zipkin.internal.v2.codec.Encoder; -import zipkin.storage.AsyncSpanConsumer; +import zipkin.internal.v2.storage.AsyncSpanConsumer; import zipkin.storage.Callback; -import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp; import static zipkin.internal.Util.propagateIfFatal; import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN; @@ -46,13 +43,13 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final this.indexNameFormatter = es.indexNameFormatter(); } - @Override public void accept(List spans, Callback callback) { + @Override public void accept(List spans, Callback callback) { if (spans.isEmpty()) { callback.onSuccess(null); return; } try { - BulkSpanIndexer indexer = newBulkSpanIndexer(es); + BulkSpanIndexer indexer = new BulkSpanIndexer(es); indexSpans(indexer, spans); indexer.execute(callback); } catch (Throwable t) { @@ -61,19 +58,17 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final } } - void indexSpans(BulkSpanIndexer indexer, List spans) throws IOException { - for (Span span : spans) { - Long timestamp = guessTimestamp(span); + void indexSpans(BulkSpanIndexer indexer, List spans) throws IOException { + for (Span2 span : spans) { + Long spanTimestamp = span.timestamp(); long indexTimestamp = 0L; // which index to store this span into - Long spanTimestamp; - if (timestamp != null) { - indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(timestamp); + if (spanTimestamp != null) { + indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp); } else { - spanTimestamp = null; // guessTimestamp is made for determining the span's authoritative timestamp. When choosing // the index bucket, any annotation is better than using current time. - for (int i = 0, length = span.annotations.size(); i < length; i++) { - indexTimestamp = span.annotations.get(i).timestamp / 1000; + for (int i = 0, length = span.annotations().size(); i < length; i++) { + indexTimestamp = span.annotations().get(i).timestamp / 1000; break; } if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis(); @@ -82,10 +77,6 @@ void indexSpans(BulkSpanIndexer indexer, List spans) throws IOException { } } - BulkSpanIndexer newBulkSpanIndexer(ElasticsearchHttpStorage es) { - return new BulkSpanIndexer(es); - } - static class BulkSpanIndexer { final HttpBulkIndexer indexer; final IndexNameFormatter indexNameFormatter; @@ -95,12 +86,10 @@ static class BulkSpanIndexer { this.indexNameFormatter = es.indexNameFormatter(); } - void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) { + void add(long indexTimestamp, Span2 span, @Nullable Long timestampMillis) { String index = indexNameFormatter.formatTypeAndTimestamp(SPAN, indexTimestamp); - for (Span2 span2 : Span2Converter.fromSpan(span)) { - byte[] document = prefixWithTimestampMillisAndQuery(span2, timestampMillis); - indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */); - } + byte[] document = prefixWithTimestampMillisAndQuery(span, timestampMillis); + indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */); } void execute(Callback callback) throws IOException { diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorage.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorage.java index 898ab881747..ebd0e9d949d 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorage.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorage.java @@ -27,7 +27,9 @@ import okhttp3.Request; import okhttp3.RequestBody; import okio.Buffer; +import zipkin.internal.AsyncSpan2ConsumerAdapter; import zipkin.internal.Nullable; +import zipkin.internal.Span2Component; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.AsyncSpanStore; import zipkin.storage.SpanStore; @@ -42,8 +44,7 @@ import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN; @AutoValue -public abstract class ElasticsearchHttpStorage implements StorageComponent { - +public abstract class ElasticsearchHttpStorage extends Span2Component implements StorageComponent { /** * A list of elasticsearch nodes to connect to, in http://host:port or https://host:port * format. Note this value is only read once. @@ -222,6 +223,10 @@ public final Builder dateSeparator(char dateSeparator) { } @Override public AsyncSpanConsumer asyncSpanConsumer() { + return AsyncSpan2ConsumerAdapter.create(asyncSpan2Consumer()); + } + + @Override protected zipkin.internal.v2.storage.AsyncSpanConsumer asyncSpan2Consumer() { ensureIndexTemplates(); return new ElasticsearchHttpSpanConsumer(this); } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java index 0e5ab10a1ab..b93225af280 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumerTest.java @@ -22,23 +22,17 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import zipkin.Annotation; -import zipkin.BinaryAnnotation; -import zipkin.Codec; -import zipkin.Span; import zipkin.TestObjects; import zipkin.internal.CallbackCaptor; import zipkin.internal.Span2; -import zipkin.internal.Span2Converter; -import zipkin.internal.v2.codec.MessageEncoder; +import zipkin.internal.Span2.Kind; import zipkin.internal.v2.codec.Decoder; +import zipkin.internal.v2.codec.Encoder; +import zipkin.internal.v2.codec.MessageEncoder; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; -import static zipkin.Constants.CLIENT_SEND; -import static zipkin.Constants.SERVER_RECV; import static zipkin.TestObjects.TODAY; -import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp; import static zipkin.internal.Util.UTF_8; import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanConsumer.prefixWithTimestampMillisAndQuery; @@ -69,7 +63,7 @@ public void close() throws IOException { @Test public void addsTimestamp_millisIntoJson() throws Exception { es.enqueue(new MockResponse()); - Span span = Span.builder().traceId(20L).id(20L).name("get") + Span2 span = Span2.builder().traceId(20L).id(20L).name("get") .timestamp(TODAY * 1000).build(); accept(span); @@ -79,77 +73,69 @@ public void close() throws IOException { } @Test public void prefixWithTimestampMillisAndQuery_skipsWhenNoData() throws Exception { - Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L) - .addAnnotation(Annotation.create(0, CLIENT_SEND, TestObjects.WEB_ENDPOINT)) + Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L) + .localEndpoint(TestObjects.WEB_ENDPOINT) + .kind(Kind.CLIENT) .build(); - byte[] result = prefixWithTimestampMillisAndQuery( - Span2Converter.fromSpan(span).get(0), - span.timestamp - ); + byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp()); assertThat(new String(result, UTF_8)) .startsWith("{\"traceId\":\""); } @Test public void prefixWithTimestampMillisAndQuery_addsTimestampMillis() throws Exception { - Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(1L) - .addAnnotation(Annotation.create(1L, CLIENT_SEND, TestObjects.WEB_ENDPOINT)) + Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(1L) + .localEndpoint(TestObjects.WEB_ENDPOINT) + .kind(Kind.CLIENT) .build(); - byte[] result = prefixWithTimestampMillisAndQuery( - Span2Converter.fromSpan(span).get(0), - span.timestamp - ); + byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp()); assertThat(new String(result, UTF_8)) .startsWith("{\"timestamp_millis\":1,\"traceId\":"); } @Test public void prefixWithTimestampMillisAndQuery_addsAnnotationQuery() throws Exception { - Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L) - .addAnnotation(Annotation.create(1L, "\"foo", TestObjects.WEB_ENDPOINT)) + Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L) + .localEndpoint(TestObjects.WEB_ENDPOINT) + .addAnnotation(1L, "\"foo") .build(); - byte[] result = prefixWithTimestampMillisAndQuery( - Span2Converter.fromSpan(span).get(0), - span.timestamp - ); + byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp()); assertThat(new String(result, UTF_8)) .startsWith("{\"_q\":[\"\\\"foo\"],\"traceId"); } @Test public void prefixWithTimestampMillisAndQuery_addsAnnotationQueryTags() throws Exception { - Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L) - .addBinaryAnnotation(BinaryAnnotation.create("\"foo", "\"bar", TestObjects.WEB_ENDPOINT)) + Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L) + .localEndpoint(TestObjects.WEB_ENDPOINT) + .putTag("\"foo", "\"bar") .build(); - byte[] result = prefixWithTimestampMillisAndQuery(Span2Converter.fromSpan(span).get(0), - span.timestamp - ); + byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp()); assertThat(new String(result, UTF_8)) .startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId"); } @Test public void prefixWithTimestampMillisAndQuery_readable() throws Exception { - Span span = Span.builder().traceId(20L).id(20L).name("get") + Span2 span = Span2.builder().traceId(20L).id(20L).name("get") .timestamp(TODAY * 1000).build(); - Span2 span2 = Span2Converter.fromSpan(span).get(0); byte[] message = MessageEncoder.JSON_BYTES.encode(asList( - prefixWithTimestampMillisAndQuery(span2, span.timestamp) + prefixWithTimestampMillisAndQuery(span, span.timestamp()) )); assertThat(Decoder.JSON.decodeList(message)) - .containsOnly(span2); // ignores timestamp_millis field + .containsOnly(span); // ignores timestamp_millis field } - @Test public void doesntWriteSpanId() throws Exception { + @Test public void doesntWriteDocumentId() throws Exception { es.enqueue(new MockResponse()); - accept(TestObjects.LOTS_OF_SPANS[0]); + accept(Span2.builder().traceId(1L).id(1L).name("foo").build()); RecordedRequest request = es.takeRequest(); assertThat(request.getBody().readByteString().utf8()) @@ -159,30 +145,34 @@ public void close() throws IOException { @Test public void writesSpanNaturallyWhenNoTimestamp() throws Exception { es.enqueue(new MockResponse()); - Span span = Span.builder().traceId(1L).id(1L).name("foo").build(); - accept(span); + Span2 span = Span2.builder().traceId(1L).id(1L).name("foo").build(); + accept(Span2.builder().traceId(1L).id(1L).name("foo").build()); assertThat(es.takeRequest().getBody().readByteString().utf8()) - .contains("\n" + new String(Codec.JSON.writeSpan(span), UTF_8) + "\n"); + .contains("\n" + new String(Encoder.JSON.encode(span), UTF_8) + "\n"); } - @Test public void traceIsSearchableBySRServiceName() throws Exception { + @Test public void traceIsSearchableByServerServiceName() throws Exception { es.enqueue(new MockResponse()); - Span clientSpan = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L) - .addAnnotation(Annotation.create(0, CLIENT_SEND, TestObjects.WEB_ENDPOINT)) + Span2 clientSpan = Span2.builder().traceId(20L).id(22L).name("").parentId(21L) + .timestamp(1000L) + .kind(Kind.CLIENT) + .localEndpoint(TestObjects.WEB_ENDPOINT) .build(); - Span serverSpan = Span.builder().traceId(20L).id(22L).name("get").parentId(21L) - .addAnnotation(Annotation.create(1000, SERVER_RECV, TestObjects.APP_ENDPOINT)) + Span2 serverSpan = Span2.builder().traceId(20L).id(22L).name("get").parentId(21L) + .timestamp(2000L) + .kind(Kind.SERVER) + .localEndpoint(TestObjects.APP_ENDPOINT) .build(); accept(serverSpan, clientSpan); // make sure that both timestamps are in the index assertThat(es.takeRequest().getBody().readByteString().utf8()) - .contains("{\"timestamp_millis\":1") - .contains("{\"timestamp_millis\":0"); + .contains("{\"timestamp_millis\":2") + .contains("{\"timestamp_millis\":1"); } @Test public void addsPipelineId() throws Exception { @@ -196,7 +186,7 @@ public void close() throws IOException { es.enqueue(new MockResponse()); - accept(TestObjects.TRACE.get(0)); + accept(Span2.builder().traceId(1L).id(1L).name("foo").build()); RecordedRequest request = es.takeRequest(); assertThat(request.getPath()) @@ -206,17 +196,13 @@ public void close() throws IOException { @Test public void choosesTypeSpecificIndex() throws Exception { es.enqueue(new MockResponse()); - Annotation foo = Annotation.create( - TimeUnit.DAYS.toMicros(365), // 1971-01-01 - "foo", - TestObjects.APP_ENDPOINT - ); - - Span span = Span.builder().traceId(1L).id(2L).parentId(1L).name("s").addAnnotation(foo).build(); + Span2 span = Span2.builder().traceId(1L).id(2L).parentId(1L).name("s") + .localEndpoint(TestObjects.APP_ENDPOINT) + .addAnnotation(TimeUnit.DAYS.toMicros(365) /* 1971-01-01 */, "foo") + .build(); // sanity check data - assertThat(span.timestamp).isNull(); - assertThat(guessTimestamp(span)).isNull(); + assertThat(span.timestamp()).isNull(); accept(span); @@ -226,9 +212,9 @@ public void close() throws IOException { ); } - void accept(Span... spans) throws Exception { + void accept(Span2... spans) throws Exception { CallbackCaptor callback = new CallbackCaptor<>(); - storage.asyncSpanConsumer().accept(asList(spans), callback); + storage.asyncSpan2Consumer().accept(asList(spans), callback); callback.get(); } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorageTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorageTest.java index a7f330422ec..f28d8609477 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorageTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpStorageTest.java @@ -80,7 +80,7 @@ public class ElasticsearchHttpStorageTest { es.enqueue(new MockResponse()); // get dependency template // check this isn't the legacy consumer - assertThat(storage.asyncSpanConsumer()) + assertThat(storage.asyncSpan2Consumer()) .isInstanceOf(ElasticsearchHttpSpanConsumer.class); // check this isn't the double reading span store assertThat(storage.asyncSpanStore()) @@ -101,9 +101,6 @@ public class ElasticsearchHttpStorageTest { es.enqueue(new MockResponse()); // get span template es.enqueue(new MockResponse()); // get dependency template - // check this isn't the legacy consumer - assertThat(storage.asyncSpanConsumer()) - .isInstanceOf(ElasticsearchHttpSpanConsumer.class); // check that we do double-reads on the legacy and new format assertThat(storage.asyncSpanStore()) .isInstanceOf(LenientDoubleCallbackAsyncSpanStore.class); @@ -124,9 +121,6 @@ public class ElasticsearchHttpStorageTest { es.enqueue(new MockResponse()); // get span template es.enqueue(new MockResponse()); // get dependency template - // check this isn't the legacy consumer - assertThat(storage.asyncSpanConsumer()) - .isInstanceOf(ElasticsearchHttpSpanConsumer.class); // check this isn't the double reading span store assertThat(storage.asyncSpanStore()) .isInstanceOf(ElasticsearchHttpSpanStore.class); diff --git a/zipkin/src/main/java/zipkin/collector/Collector.java b/zipkin/src/main/java/zipkin/collector/Collector.java index fff30bdcea0..ef2e691f5d7 100644 --- a/zipkin/src/main/java/zipkin/collector/Collector.java +++ b/zipkin/src/main/java/zipkin/collector/Collector.java @@ -18,7 +18,13 @@ import java.util.logging.Logger; import zipkin.Span; import zipkin.SpanDecoder; +import zipkin.internal.Collector2; import zipkin.internal.DetectingSpanDecoder; +import zipkin.internal.Span2; +import zipkin.internal.Span2Component; +import zipkin.internal.Span2Converter; +import zipkin.internal.Span2JsonSpanDecoder; +import zipkin.internal.v2.codec.Decoder; import zipkin.storage.Callback; import zipkin.storage.StorageComponent; @@ -75,11 +81,22 @@ public Collector build() { final CollectorSampler sampler; final StorageComponent storage; + final Collector2 storage2; Collector(Builder builder) { super(builder.logger, builder.metrics); this.storage = checkNotNull(builder.storage, "storage"); this.sampler = builder.sampler == null ? CollectorSampler.ALWAYS_SAMPLE : builder.sampler; + if (storage instanceof Span2Component) { + storage2 = new Collector2( + builder.logger, + builder.metrics, + builder.sampler, + (Span2Component) storage + ); + } else { + storage2 = null; + } } @Override @@ -91,7 +108,11 @@ public void acceptSpans(byte[] serializedSpans, SpanDecoder decoder, Callback spans, Callback callback) { + if (storage2 != null) { + int length = spans.size(); + List span2s = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + span2s.addAll(Span2Converter.fromSpan(spans.get(i))); + } + storage2.accept(span2s, callback); + } else { + super.accept(spans, callback); + } + } + @Override protected List decodeList(SpanDecoder decoder, byte[] serialized) { return decoder.readSpans(serialized); } diff --git a/zipkin/src/main/java/zipkin/internal/AsyncSpan2ConsumerAdapter.java b/zipkin/src/main/java/zipkin/internal/AsyncSpan2ConsumerAdapter.java new file mode 100644 index 00000000000..322e21fa476 --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/AsyncSpan2ConsumerAdapter.java @@ -0,0 +1,42 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.util.ArrayList; +import java.util.List; +import zipkin.Span; +import zipkin.storage.AsyncSpanConsumer; +import zipkin.storage.Callback; + +public final class AsyncSpan2ConsumerAdapter implements AsyncSpanConsumer { + + public static AsyncSpanConsumer create(zipkin.internal.v2.storage.AsyncSpanConsumer delegate) { + return new AsyncSpan2ConsumerAdapter(delegate); + } + + final zipkin.internal.v2.storage.AsyncSpanConsumer delegate; + + AsyncSpan2ConsumerAdapter(zipkin.internal.v2.storage.AsyncSpanConsumer delegate) { + this.delegate = delegate; + } + + @Override public void accept(List spans, Callback callback) { + int length = spans.size(); + List linkSpans = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + linkSpans.addAll(Span2Converter.fromSpan(spans.get(i))); + } + delegate.accept(linkSpans, callback); + } +} diff --git a/zipkin/src/main/java/zipkin/internal/Collector2.java b/zipkin/src/main/java/zipkin/internal/Collector2.java new file mode 100644 index 00000000000..31244b93320 --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/Collector2.java @@ -0,0 +1,56 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.util.List; +import java.util.logging.Logger; +import zipkin.collector.CollectorMetrics; +import zipkin.collector.CollectorSampler; +import zipkin.internal.v2.codec.Decoder; +import zipkin.storage.Callback; + +import static zipkin.internal.Util.checkNotNull; + +public final class Collector2 extends Collector, Span2> { + final Span2Component storage; + final CollectorSampler sampler; + + public Collector2(Logger logger, @Nullable CollectorMetrics metrics, + @Nullable CollectorSampler sampler, Span2Component storage) { + super(logger, metrics); + this.storage = checkNotNull(storage, "storage"); + this.sampler = sampler == null ? CollectorSampler.ALWAYS_SAMPLE : sampler; + } + + @Override + public void acceptSpans(byte[] serializedSpans, Decoder decoder, Callback callback) { + super.acceptSpans(serializedSpans, decoder, callback); + } + + @Override protected List decodeList(Decoder decoder, byte[] serialized) { + return decoder.decodeList(serialized); + } + + @Override protected boolean isSampled(Span2 span) { + return sampler.isSampled(span.traceId(), span.debug()); + } + + @Override protected void record(List sampled, Callback callback) { + storage.asyncSpan2Consumer().accept(sampled, callback); + } + + @Override protected String idString(Span2 span) { + return span.idString(); + } +} diff --git a/zipkin/src/main/java/zipkin/internal/Span2Component.java b/zipkin/src/main/java/zipkin/internal/Span2Component.java new file mode 100644 index 00000000000..6ab12d958a6 --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/Span2Component.java @@ -0,0 +1,21 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import zipkin.internal.v2.storage.AsyncSpanConsumer; + +// temporary type until other classes around this complete +public abstract class Span2Component { + protected abstract AsyncSpanConsumer asyncSpan2Consumer(); +} diff --git a/zipkin/src/main/java/zipkin/internal/v2/storage/AsyncSpanConsumer.java b/zipkin/src/main/java/zipkin/internal/v2/storage/AsyncSpanConsumer.java new file mode 100644 index 00000000000..31bfe2616a8 --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/v2/storage/AsyncSpanConsumer.java @@ -0,0 +1,24 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal.v2.storage; + +import java.util.List; +import zipkin.internal.Span2; +import zipkin.storage.Callback; + +// @FunctionalInterface +public interface AsyncSpanConsumer { + + void accept(List spans, Callback callback); +} diff --git a/zipkin/src/test/java/zipkin/collector/CollectorTest.java b/zipkin/src/test/java/zipkin/collector/CollectorTest.java index 4522a576646..cacd0e7da6c 100644 --- a/zipkin/src/test/java/zipkin/collector/CollectorTest.java +++ b/zipkin/src/test/java/zipkin/collector/CollectorTest.java @@ -20,14 +20,20 @@ import zipkin.internal.ApplyTimestampAndDuration; import zipkin.internal.DetectingSpanDecoder; import zipkin.internal.Span2; +import zipkin.internal.Span2Component; import zipkin.internal.Span2Converter; import zipkin.internal.Util; import zipkin.internal.v2.codec.Encoder; import zipkin.internal.v2.codec.MessageEncoder; +import zipkin.internal.v2.storage.AsyncSpanConsumer; +import zipkin.storage.Callback; import zipkin.storage.StorageComponent; import static java.util.Arrays.asList; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -88,4 +94,26 @@ public class CollectorTest { verify(collector).acceptSpans(bytes, SpanDecoder.DETECTING_DECODER, NOOP); verify(collector).accept(asList(span1), NOOP); } + + /** + * When a version 2 storage component is in use, route directly to it as opposed to + * double-conversion. + */ + @Test public void routesToSpan2Collector() { + abstract class WithSpan2 extends Span2Component implements StorageComponent { + @Override public abstract AsyncSpanConsumer asyncSpan2Consumer(); + } + WithSpan2 storage = mock(WithSpan2.class); + AsyncSpanConsumer span2Consumer = mock(AsyncSpanConsumer.class); + when(storage.asyncSpan2Consumer()).thenReturn(span2Consumer); + + collector = spy(Collector.builder(Collector.class) + .storage(storage).build()); + + byte[] bytes = MessageEncoder.JSON_BYTES.encode(asList(Encoder.JSON.encode(span2_1))); + collector.acceptSpans(bytes, SpanDecoder.DETECTING_DECODER, NOOP); + + verify(collector, never()).isSampled(any(Span.class)); // skips v1 processing + verify(span2Consumer).accept(eq(asList(span2_1)), any(Callback.class)); // goes to v2 instead + } }