diff --git a/.gitignore b/.gitignore
index ccf024c145b..0141aa5d4c8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,6 +22,7 @@ nosetests.xml
.settings
.DS_Store
.classpath
+.tool-versions
# Built documentation
docs/
diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml
index 76d2aa7c79f..54eae11d51b 100644
--- a/google-cloud-spanner/clirr-ignored-differences.xml
+++ b/google-cloud-spanner/clirr-ignored-differences.xml
@@ -426,7 +426,6 @@
com/google/cloud/spanner/connection/Connection
void rollbackToSavepoint(java.lang.String)
-
7012
@@ -540,6 +539,12 @@
com/google/cloud/spanner/Dialect
java.lang.String getDefaultSchema()
+
+ 7005
+ com/google/cloud/spanner/PartitionedDmlTransaction
+ void setSpan(io.opencensus.trace.Span)
+ void setSpan(com.google.cloud.spanner.ISpan)
+
diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml
index b3aa2b88943..90f1d73c6e6 100644
--- a/google-cloud-spanner/pom.xml
+++ b/google-cloud-spanner/pom.xml
@@ -247,6 +247,14 @@
opencensus-impl
test
+
+ io.opentelemetry
+ opentelemetry-api
+
+
+ io.opentelemetry
+ opentelemetry-context
+
com.google.auth
google-auth-library-oauth2-http
@@ -393,7 +401,6 @@
2.2
test
-
org.openjdk.jmh
@@ -407,9 +414,28 @@
1.37
test
-
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+ test
+
+
+ io.opentelemetry
+ opentelemetry-sdk-metrics
+ test
+
+
+ io.opentelemetry
+ opentelemetry-sdk-trace
+ test
+
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
-
java9
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
index 0f4310f9b4d..0714b7651cc 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
@@ -39,6 +39,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
@@ -53,8 +54,6 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Tracing;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -70,7 +69,8 @@ abstract class AbstractReadContext
abstract static class Builder, T extends AbstractReadContext> {
private SessionImpl session;
private SpannerRpc rpc;
- private Span span = Tracing.getTracer().getCurrentSpan();
+ private ISpan span;
+ private TraceWrapper tracer;
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private DirectedReadOptions defaultDirectedReadOption;
@@ -94,11 +94,16 @@ B setRpc(SpannerRpc rpc) {
return self();
}
- B setSpan(Span span) {
+ B setSpan(ISpan span) {
this.span = span;
return self();
}
+ B setTracer(TraceWrapper tracer) {
+ this.tracer = tracer;
+ return self();
+ }
+
B setDefaultPrefetchChunks(int defaultPrefetchChunks) {
this.defaultPrefetchChunks = defaultPrefetchChunks;
return self();
@@ -389,9 +394,12 @@ void initTransaction() {
}
transactionId = transaction.getId();
span.addAnnotation(
- "Transaction Creation Done", TraceUtil.getTransactionAnnotations(transaction));
+ "Transaction Creation Done",
+ ImmutableMap.of(
+ "Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));
+
} catch (SpannerException e) {
- span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e));
+ span.addAnnotation("Transaction Creation Failed", e);
throw e;
}
}
@@ -402,7 +410,8 @@ void initTransaction() {
final SessionImpl session;
final SpannerRpc rpc;
final ExecutorProvider executorProvider;
- Span span;
+ ISpan span;
+ TraceWrapper tracer;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;
@@ -435,10 +444,11 @@ void initTransaction() {
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
+ this.tracer = builder.tracer;
}
@Override
- public void setSpan(Span span) {
+ public void setSpan(ISpan span) {
this.span = span;
}
@@ -692,6 +702,7 @@ ResultSet executeQueryInternalWithOptions(
MAX_BUFFERED_CHUNKS,
SpannerImpl.QUERY,
span,
+ tracer,
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
@@ -752,7 +763,7 @@ public final void invalidate() {
@Override
public void close() {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
synchronized (lock) {
isClosed = true;
}
@@ -837,6 +848,7 @@ ResultSet readInternalWithOptions(
MAX_BUFFERED_CHUNKS,
SpannerImpl.READ,
span,
+ tracer,
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
index c18e64165bc..d946257bc45 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
@@ -37,7 +37,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.CharSource;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -53,11 +52,6 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TypeCode;
import io.grpc.Context;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -87,7 +81,6 @@
/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet extends AbstractStructReader implements ResultSet {
- private static final Tracer tracer = Tracing.getTracer();
private static final com.google.protobuf.Value NULL_VALUE =
com.google.protobuf.Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();
@@ -1206,7 +1199,8 @@ abstract static class ResumableStreamIterator extends AbstractIterator buffer = new LinkedList<>();
private final int maxBufferSize;
- private final Span span;
+ private final ISpan span;
+ private final TraceWrapper tracer;
private CloseableIterator stream;
private ByteString resumeToken;
private boolean finished;
@@ -1220,12 +1214,14 @@ abstract static class ResumableStreamIterator extends AbstractIterator retryableCodes) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
- this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
+ this.tracer = tracer;
+ this.span = tracer.spanBuilderWithExplicitParent(streamName, parent);
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.backOff = newBackOff();
@@ -1281,11 +1277,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
}
private void backoffSleep(Context context, long backoffMillis) throws SpannerException {
- tracer
- .getCurrentSpan()
- .addAnnotation(
- "Backing off",
- ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
+ tracer.getCurrentSpan().addAnnotation("Backing off", "Delay", backoffMillis);
final CountDownLatch latch = new CountDownLatch(1);
final Context.CancellationListener listener =
ignored -> {
@@ -1325,7 +1317,7 @@ public void execute(Runnable command) {
public void close(@Nullable String message) {
if (stream != null) {
stream.close(message);
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
stream = null;
}
}
@@ -1343,11 +1335,9 @@ protected PartialResultSet computeNext() {
if (stream == null) {
span.addAnnotation(
"Starting/Resuming stream",
- ImmutableMap.of(
- "ResumeToken",
- AttributeValue.stringAttributeValue(
- resumeToken == null ? "null" : resumeToken.toStringUtf8())));
- try (Scope s = tracer.withSpan(span)) {
+ "ResumeToken",
+ resumeToken == null ? "null" : resumeToken.toStringUtf8());
+ try (IScope scope = tracer.withSpan(span)) {
// When start a new stream set the Span as current to make the gRPC Span a child of
// this Span.
stream = checkNotNull(startStream(resumeToken));
@@ -1387,9 +1377,7 @@ protected PartialResultSet computeNext() {
}
} catch (SpannerException spannerException) {
if (safeToRetry && isRetryable(spannerException)) {
- span.addAnnotation(
- "Stream broken. Safe to retry",
- TraceUtil.getExceptionAnnotations(spannerException));
+ span.addAnnotation("Stream broken. Safe to retry", spannerException);
logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException);
// Truncate any items in the buffer before the last retry token.
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
@@ -1397,7 +1385,7 @@ protected PartialResultSet computeNext() {
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
- try (Scope s = tracer.withSpan(span)) {
+ try (IScope s = tracer.withSpan(span)) {
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
backoffSleep(context, delay);
@@ -1408,12 +1396,12 @@ protected PartialResultSet computeNext() {
continue;
}
- span.addAnnotation("Stream broken. Not safe to retry");
- TraceUtil.setWithFailure(span, spannerException);
+ span.addAnnotation("Stream broken. Not safe to retry", spannerException);
+ span.setStatus(spannerException);
throw spannerException;
} catch (RuntimeException e) {
- span.addAnnotation("Stream broken. Not safe to retry");
- TraceUtil.setWithFailure(span, e);
+ span.addAnnotation("Stream broken. Not safe to retry", e);
+ span.setStatus(e);
throw e;
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
index 510bd02bc0e..8b20dd824a0 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
@@ -28,31 +28,27 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
/** Implementation of {@link AsyncTransactionManager}. */
final class AsyncTransactionManagerImpl
implements CommittableAsyncTransactionManager, SessionTransaction {
- private static final Tracer tracer = Tracing.getTracer();
private final SessionImpl session;
- private Span span;
+ private ISpan span;
private final Options options;
private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture commitResponse = SettableApiFuture.create();
- AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
+ AsyncTransactionManagerImpl(SessionImpl session, ISpan span, TransactionOption... options) {
this.session = session;
this.span = span;
this.options = Options.fromTransactionOptions(options);
}
@Override
- public void setSpan(Span span) {
+ public void setSpan(ISpan span) {
this.span = span;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
index eab90a266c9..664cde1edbb 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
@@ -30,7 +30,6 @@
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.TransactionSelector;
-import io.opencensus.trace.Tracing;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -62,7 +61,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(
- sessionClient.getSpanner().getOptions().getDirectedReadOptions()),
+ sessionClient.getSpanner().getOptions().getDirectedReadOptions())
+ .setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
+ .setTracer(sessionClient.getSpanner().getTracer()),
checkNotNull(bound));
}
@@ -81,7 +82,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(
- sessionClient.getSpanner().getOptions().getDirectedReadOptions()),
+ sessionClient.getSpanner().getOptions().getDirectedReadOptions())
+ .setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
+ .setTracer(sessionClient.getSpanner().getTracer()),
batchTransactionId);
}
@@ -95,7 +98,6 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(builder.setTimestampBound(bound));
this.sessionName = session.getName();
this.options = session.getOptions();
- setSpan(Tracing.getTracer().getCurrentSpan());
initTransaction();
}
@@ -104,7 +106,6 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(builder.setTransactionId(batchTransactionId.getTransactionId()));
this.sessionName = session.getName();
this.options = session.getOptions();
- setSpan(Tracing.getTracer().getCurrentSpan());
}
@Override
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
index 3835cb1f338..b63ad379305 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
@@ -26,29 +26,25 @@
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
import javax.annotation.Nullable;
class DatabaseClientImpl implements DatabaseClient {
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
- private static final Tracer tracer = Tracing.getTracer();
-
+ private final TraceWrapper tracer;
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting
- DatabaseClientImpl(SessionPool pool) {
- this("", pool);
+ DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
+ this("", pool, tracer);
}
- DatabaseClientImpl(String clientId, SessionPool pool) {
+ DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
this.clientId = clientId;
this.pool = pool;
+ this.tracer = tracer;
}
@VisibleForTesting
@@ -76,14 +72,14 @@ public Timestamp write(final Iterable mutations) throws SpannerExcepti
public CommitResponse writeWithOptions(
final Iterable mutations, final TransactionOption... options)
throws SpannerException {
- Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
} catch (RuntimeException e) {
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw e;
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
@@ -96,15 +92,15 @@ public Timestamp writeAtLeastOnce(final Iterable mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable mutations, final TransactionOption... options)
throws SpannerException {
- Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
} catch (RuntimeException e) {
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw e;
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
@@ -112,136 +108,145 @@ public CommitResponse writeAtLeastOnceWithOptions(
public ServerStream batchWriteAtLeastOnce(
final Iterable mutationGroups, final TransactionOption... options)
throws SpannerException {
- Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
} catch (RuntimeException e) {
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw e;
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
@Override
public ReadContext singleUse() {
- Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().singleUse();
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public ReadContext singleUse(TimestampBound bound) {
- Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().singleUse(bound);
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
- Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
- Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public ReadOnlyTransaction readOnlyTransaction() {
- Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().readOnlyTransaction();
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
- Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
- Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().readWriteTransaction(options);
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
- } finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
}
}
@Override
public TransactionManager transactionManager(TransactionOption... options) {
- Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().transactionManager(options);
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public AsyncRunner runAsync(TransactionOption... options) {
- Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().runAsync(options);
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
- Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return getSession().transactionManagerAsync(options);
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
- Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
+ try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IScope.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IScope.java
new file mode 100644
index 00000000000..cbefe47b887
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/IScope.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+/**
+ * This interface represents a scope that wraps both OpenCensus and OpenTelemetry scopes. It extends
+ * the AutoCloseable interface and overrides the close method that does not throw an exception.
+ */
+interface IScope extends AutoCloseable {
+ @Override
+ void close();
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ISpan.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ISpan.java
new file mode 100644
index 00000000000..ce837de0e58
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ISpan.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import java.util.Map;
+
+interface ISpan {
+
+ /**
+ * Adds an annotation to the OpenCensus and OpenTelemetry span.
+ *
+ * @param message the description of the annotation event.
+ * @param attributes the map of attribute key-value pairs that will be added; these are associated
+ * with this annotation.
+ */
+ void addAnnotation(String message, Map attributes);
+
+ void addAnnotation(String message);
+
+ void addAnnotation(String message, String key, String value);
+
+ void addAnnotation(String message, String key, long value);
+
+ void addAnnotation(String message, Throwable e);
+
+ void setStatus(Throwable e);
+
+ void setStatus(ErrorCode errorCode);
+
+ void end();
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MetricRegistryConstants.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MetricRegistryConstants.java
index 3512a75732d..d0098e961e6 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MetricRegistryConstants.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MetricRegistryConstants.java
@@ -65,13 +65,20 @@ class MetricRegistryConstants {
/** Unit to represent counts. */
static final String COUNT = "1";
+ static final String INSTRUMENTATION_SCOPE = "cloud.google.com/java";
+
+ static final String METRIC_PREFIX = "cloud.google.com/java/";
+
// The Metric name and description
- static final String MAX_IN_USE_SESSIONS = "cloud.google.com/java/spanner/max_in_use_sessions";
- static final String MAX_ALLOWED_SESSIONS = "cloud.google.com/java/spanner/max_allowed_sessions";
- static final String GET_SESSION_TIMEOUTS = "cloud.google.com/java/spanner/get_session_timeouts";
- static final String NUM_ACQUIRED_SESSIONS = "cloud.google.com/java/spanner/num_acquired_sessions";
- static final String NUM_RELEASED_SESSIONS = "cloud.google.com/java/spanner/num_released_sessions";
- static final String NUM_SESSIONS_IN_POOL = "cloud.google.com/java/spanner/num_sessions_in_pool";
+ static final String MAX_IN_USE_SESSIONS = "spanner/max_in_use_sessions";
+ static final String MAX_ALLOWED_SESSIONS = "spanner/max_allowed_sessions";
+ static final String GET_SESSION_TIMEOUTS = "spanner/get_session_timeouts";
+ static final String NUM_ACQUIRED_SESSIONS = "spanner/num_acquired_sessions";
+ static final String NUM_RELEASED_SESSIONS = "spanner/num_released_sessions";
+ static final String NUM_SESSIONS_IN_POOL = "spanner/num_sessions_in_pool";
+ static final String NUM_SESSIONS_IN_USE = "spanner/num_in_use_sessions";
+ static final String NUM_SESSIONS_AVAILABLE = "spanner/num_available_sessions";
+ static final String SESSIONS_TYPE = "session_type";
static final String MAX_IN_USE_SESSIONS_DESCRIPTION =
"The maximum number of sessions in use during the last 10 minute interval.";
@@ -84,4 +91,11 @@ class MetricRegistryConstants {
static final String NUM_RELEASED_SESSIONS_DESCRIPTION =
"The number of sessions released by the user and pool maintainer.";
static final String NUM_SESSIONS_IN_POOL_DESCRIPTION = "The number of sessions in the pool.";
+
+ static final String SPANNER_GFE_LATENCY = "spanner/gfe_latency";
+ static final String SPANNER_GFE_LATENCY_DESCRIPTION =
+ "Latency between Google's network receiving an RPC and reading back the first byte of the response";
+ static final String SPANNER_GFE_HEADER_MISSING_COUNT = "spanner/gfe_header_missing_count";
+ static final String SPANNER_GFE_HEADER_MISSING_COUNT_DESCRIPTION =
+ "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network";
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenCensusScope.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenCensusScope.java
new file mode 100644
index 00000000000..81c1db16571
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenCensusScope.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import io.opencensus.common.Scope;
+
+class OpenCensusScope implements IScope {
+
+ private final Scope openCensusScope;
+
+ OpenCensusScope(Scope openCensusScope) {
+ this.openCensusScope = openCensusScope;
+ }
+
+ @Override
+ public void close() {
+ openCensusScope.close();
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenCensusSpan.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenCensusSpan.java
new file mode 100644
index 00000000000..86e43778f31
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenCensusSpan.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.common.collect.ImmutableMap;
+import io.opencensus.contrib.grpc.util.StatusConverter;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.EndSpanOptions;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.Status;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OpenCensusSpan implements ISpan {
+
+ static final EndSpanOptions END_SPAN_OPTIONS =
+ EndSpanOptions.builder().setSampleToLocalSpanStore(true).build();
+ private final Span openCensusSpan;
+
+ public OpenCensusSpan(Span openCensusSpan) {
+ this.openCensusSpan = openCensusSpan;
+ }
+
+ Span getOpenCensusSpan() {
+ return openCensusSpan;
+ }
+
+ private ImmutableMap getOpenCensusExceptionAnnotations(Throwable e) {
+ if (e instanceof SpannerException) {
+ return ImmutableMap.of(
+ "Status",
+ AttributeValue.stringAttributeValue(((SpannerException) e).getErrorCode().toString()));
+ }
+ return ImmutableMap.of();
+ }
+
+ @Override
+ public void addAnnotation(String message, Map attributes) {
+ Map ocAttributeValues = new HashMap<>();
+ for (Map.Entry entry : attributes.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if (value instanceof String) {
+ ocAttributeValues.put(key, AttributeValue.stringAttributeValue((String) value));
+ } else if (value instanceof Long) {
+ ocAttributeValues.put(key, AttributeValue.longAttributeValue((Long) value));
+ }
+ }
+
+ if (ocAttributeValues.size() > 0) {
+ openCensusSpan.addAnnotation(message, ocAttributeValues);
+ }
+ }
+
+ @Override
+ public void addAnnotation(String message) {
+ openCensusSpan.addAnnotation(message);
+ }
+
+ @Override
+ public void addAnnotation(String message, String key, String value) {
+ openCensusSpan.addAnnotation(
+ message, ImmutableMap.of(key, AttributeValue.stringAttributeValue(value)));
+ }
+
+ @Override
+ public void addAnnotation(String message, String key, long value) {
+ openCensusSpan.addAnnotation(
+ message, ImmutableMap.of(key, AttributeValue.longAttributeValue(value)));
+ }
+
+ @Override
+ public void addAnnotation(String message, Throwable e) {
+ openCensusSpan.addAnnotation(message, this.getOpenCensusExceptionAnnotations(e));
+ }
+
+ @Override
+ public void setStatus(Throwable e) {
+ if (e instanceof SpannerException) {
+ openCensusSpan.setStatus(
+ StatusConverter.fromGrpcStatus(((SpannerException) e).getErrorCode().getGrpcStatus())
+ .withDescription(e.getMessage()));
+ } else {
+ openCensusSpan.setStatus(Status.INTERNAL.withDescription(e.getMessage()));
+ }
+ }
+
+ @Override
+ public void setStatus(ErrorCode errorCode) {
+ openCensusSpan.setStatus(StatusConverter.fromGrpcStatus(errorCode.getGrpcStatus()));
+ }
+
+ @Override
+ public void end() {
+ openCensusSpan.end(END_SPAN_OPTIONS);
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryScope.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryScope.java
new file mode 100644
index 00000000000..6766bc54e6b
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetryScope.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import io.opentelemetry.context.Scope;
+
+class OpenTelemetryScope implements IScope {
+
+ private final Scope openTelemetryScope;
+
+ OpenTelemetryScope(Scope openTelemetryScope) {
+ this.openTelemetryScope = openTelemetryScope;
+ }
+
+ @Override
+ public void close() {
+ openTelemetryScope.close();
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetrySpan.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetrySpan.java
new file mode 100644
index 00000000000..98e1540e760
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/OpenTelemetrySpan.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import java.util.Map;
+
+class OpenTelemetrySpan implements ISpan {
+
+ private final io.opentelemetry.api.trace.Span openTelemetrySpan;
+
+ OpenTelemetrySpan(Span openTelemetrySpan) {
+ this.openTelemetrySpan = openTelemetrySpan;
+ }
+
+ Span getOpenTelemetrySpan() {
+ return openTelemetrySpan;
+ }
+
+ @Override
+ public void addAnnotation(String message, Map attributes) {
+ AttributesBuilder otAttributesBuilder = Attributes.builder();
+ for (Map.Entry entry : attributes.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if (value instanceof String) {
+ otAttributesBuilder.put(key, (String) value);
+ } else if (value instanceof Long) {
+ otAttributesBuilder.put(key, (Long) value);
+ }
+ }
+ openTelemetrySpan.addEvent(message, otAttributesBuilder.build());
+ }
+
+ @Override
+ public void addAnnotation(String message) {
+ openTelemetrySpan.addEvent(message);
+ }
+
+ @Override
+ public void addAnnotation(String message, String key, String value) {
+ openTelemetrySpan.addEvent(message, Attributes.builder().put(key, value).build());
+ }
+
+ @Override
+ public void addAnnotation(String message, String key, long value) {
+ openTelemetrySpan.addEvent(message, Attributes.builder().put(key, value).build());
+ }
+
+ @Override
+ public void addAnnotation(String message, Throwable e) {
+ openTelemetrySpan.addEvent(message, this.createOpenTelemetryExceptionAnnotations(e));
+ }
+
+ @Override
+ public void setStatus(Throwable e) {
+ if (e instanceof SpannerException) {
+ openTelemetrySpan.setStatus(StatusCode.ERROR, ((SpannerException) e).getErrorCode().name());
+ } else {
+ openTelemetrySpan.setStatus(StatusCode.ERROR, ErrorCode.INTERNAL.name());
+ }
+ openTelemetrySpan.recordException(e);
+ }
+
+ @Override
+ public void setStatus(ErrorCode errorCode) {
+ openTelemetrySpan.setStatus(StatusCode.ERROR, errorCode.name());
+ }
+
+ @Override
+ public void end() {
+ openTelemetrySpan.end();
+ }
+
+ private Attributes createOpenTelemetryExceptionAnnotations(Throwable e) {
+ AttributesBuilder attributesBuilder = Attributes.builder();
+ if (e instanceof SpannerException) {
+ attributesBuilder.put("Status", ((SpannerException) e).getErrorCode().toString());
+ }
+ return attributesBuilder.build();
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
index 36991b18c3d..cabc270566c 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
@@ -18,6 +18,7 @@
import static com.google.common.base.Preconditions.checkState;
+import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.DeadlineExceededException;
@@ -38,7 +39,6 @@
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status;
-import io.opencensus.trace.Span;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -46,6 +46,7 @@
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;
+@InternalApi
public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction {
private static final Logger LOGGER = Logger.getLogger(PartitionedDmlTransaction.class.getName());
@@ -137,9 +138,9 @@ public void invalidate() {
isValid = false;
}
- // No-op method needed to implement SessionTransaction interface.
+ /** No-op method needed to implement SessionTransaction interface. */
@Override
- public void setSpan(Span span) {}
+ public void setSpan(ISpan span) {}
private Duration tryUpdateTimeout(final Duration timeout, final Stopwatch stopwatch) {
final Duration remainingTimeout =
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
index 474d99671e8..b294ef33395 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
@@ -25,8 +25,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.Span;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -127,16 +125,17 @@ private BatchCreateSessionsRunnable(
public void run() {
List sessions;
int remainingSessionsToCreate = sessionCount;
- Span span = SpannerImpl.tracer.spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS).startSpan();
- try (Scope s = SpannerImpl.tracer.withSpan(span)) {
- SpannerImpl.tracer
+ ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS);
+ try (IScope s = spanner.getTracer().withSpan(span)) {
+ spanner
+ .getTracer()
.getCurrentSpan()
.addAnnotation(String.format("Creating %d sessions", sessionCount));
while (remainingSessionsToCreate > 0) {
try {
sessions = internalBatchCreateSessions(remainingSessionsToCreate, channelHint);
} catch (Throwable t) {
- TraceUtil.setWithFailure(SpannerImpl.tracer.getCurrentSpan(), t);
+ spanner.getTracer().getCurrentSpan().setStatus(t);
consumer.onSessionCreateFailure(t, remainingSessionsToCreate);
break;
}
@@ -146,7 +145,7 @@ public void run() {
remainingSessionsToCreate -= sessions.size();
}
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
}
@@ -206,8 +205,8 @@ SessionImpl createSession() {
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
}
- Span span = SpannerImpl.tracer.spanBuilder(SpannerImpl.CREATE_SESSION).startSpan();
- try (Scope s = SpannerImpl.tracer.withSpan(span)) {
+ ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION);
+ try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
.getRpc()
@@ -218,10 +217,10 @@ SessionImpl createSession() {
options);
return new SessionImpl(spanner, session.getName(), options);
} catch (RuntimeException e) {
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw e;
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
@@ -290,13 +289,13 @@ void asyncBatchCreateSessions(
private List internalBatchCreateSessions(
final int sessionCount, final long channelHint) throws SpannerException {
final Map options = optionMap(SessionOption.channelHint(channelHint));
- Span parent = SpannerImpl.tracer.getCurrentSpan();
- Span span =
- SpannerImpl.tracer
- .spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent)
- .startSpan();
+ ISpan parent = spanner.getTracer().getCurrentSpan();
+ ISpan span =
+ spanner
+ .getTracer()
+ .spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent);
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
- try (Scope s = SpannerImpl.tracer.withSpan(span)) {
+ try (IScope s = spanner.getTracer().withSpan(span)) {
List sessions =
spanner
.getRpc()
@@ -309,14 +308,15 @@ private List internalBatchCreateSessions(
span.addAnnotation(
String.format(
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
List res = new ArrayList<>(sessionCount);
for (com.google.spanner.v1.Session session : sessions) {
res.add(new SessionImpl(spanner, session.getName(), options));
}
return res;
} catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
+ span.setStatus(e);
+ span.end();
throw e;
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index 92e75e332e7..29928f61cec 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -44,10 +44,6 @@
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -61,8 +57,7 @@
* users need not be aware of the actual session management, pooling and handling.
*/
class SessionImpl implements Session {
-
- private static final Tracer tracer = Tracing.getTracer();
+ private final TraceWrapper tracer;
/** Keep track of running transactions on this session per thread. */
static final ThreadLocal hasPendingTransaction = ThreadLocal.withInitial(() -> false);
@@ -93,7 +88,7 @@ interface SessionTransaction {
void invalidate();
/** Registers the current span on the transaction. */
- void setSpan(Span span);
+ void setSpan(ISpan span);
}
private final SpannerImpl spanner;
@@ -102,11 +97,12 @@ interface SessionTransaction {
private SessionTransaction activeTransaction;
ByteString readyTransactionId;
private final Map options;
- private Span currentSpan;
private volatile Instant lastUseTime;
+ private ISpan currentSpan;
SessionImpl(SpannerImpl spanner, String name, Map options) {
this.spanner = spanner;
+ this.tracer = spanner.getTracer();
this.options = options;
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
@@ -122,11 +118,11 @@ public String getName() {
return options;
}
- void setCurrentSpan(Span span) {
+ void setCurrentSpan(ISpan span) {
currentSpan = span;
}
- Span getCurrentSpan() {
+ ISpan getCurrentSpan() {
return currentSpan;
}
@@ -202,15 +198,15 @@ public CommitResponse writeAtLeastOnceWithOptions(
requestBuilder.setRequestOptions(commitRequestOptions);
}
CommitRequest request = requestBuilder.build();
- Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
+ try (IScope s = tracer.withSpan(span)) {
return SpannerRetryHelper.runTxWithRetriesOnAborted(
() -> new CommitResponse(spanner.getRpc().commit(request, this.options)));
} catch (RuntimeException e) {
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw e;
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
@@ -242,14 +238,14 @@ public ServerStream batchWriteAtLeastOnce(
if (batchWriteRequestOptions != null) {
requestBuilder.setRequestOptions(batchWriteRequestOptions);
}
- Span span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
+ try (IScope s = tracer.withSpan(span)) {
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
} catch (Throwable e) {
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw SpannerExceptionFactory.newSpannerException(e);
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
@@ -269,6 +265,7 @@ public ReadContext singleUse(TimestampBound bound) {
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
+ .setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
}
@@ -289,6 +286,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
+ .setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.buildSingleUseReadOnlyTransaction());
}
@@ -309,6 +307,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
+ .setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
}
@@ -325,7 +324,7 @@ public AsyncRunner runAsync(TransactionOption... options) {
@Override
public TransactionManager transactionManager(TransactionOption... options) {
- return new TransactionManagerImpl(this, currentSpan, options);
+ return new TransactionManagerImpl(this, currentSpan, tracer, options);
}
@Override
@@ -346,14 +345,14 @@ public ApiFuture asyncClose() {
@Override
public void close() {
- Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
+ try (IScope s = tracer.withSpan(span)) {
spanner.getRpc().deleteSession(name, options);
} catch (RuntimeException e) {
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw e;
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
@@ -373,7 +372,7 @@ ApiFuture beginTransactionAsync(boolean routeToLeader) {
ApiFuture beginTransactionAsync(Options transactionOptions, boolean routeToLeader) {
final SettableApiFuture res = SettableApiFuture.create();
- final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
+ final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(name)
@@ -382,30 +381,31 @@ ApiFuture beginTransactionAsync(Options transactionOptions, boolean
final ApiFuture requestFuture =
spanner.getRpc().beginTransactionAsync(request, options, routeToLeader);
requestFuture.addListener(
- tracer.withSpan(
- span,
- () -> {
- try {
- Transaction txn = requestFuture.get();
- if (txn.getId().isEmpty()) {
- throw newSpannerException(
- ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
- }
- span.end(TraceUtil.END_SPAN_OPTIONS);
- res.set(txn.getId());
- } catch (ExecutionException e) {
- TraceUtil.endSpanWithFailure(span, e);
- res.setException(
- SpannerExceptionFactory.newSpannerException(
- e.getCause() == null ? e : e.getCause()));
- } catch (InterruptedException e) {
- TraceUtil.endSpanWithFailure(span, e);
- res.setException(SpannerExceptionFactory.propagateInterrupt(e));
- } catch (Exception e) {
- TraceUtil.endSpanWithFailure(span, e);
- res.setException(e);
- }
- }),
+ () -> {
+ try (IScope s = tracer.withSpan(span)) {
+ Transaction txn = requestFuture.get();
+ if (txn.getId().isEmpty()) {
+ throw newSpannerException(
+ ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
+ }
+ span.end();
+ res.set(txn.getId());
+ } catch (ExecutionException e) {
+ span.setStatus(e);
+ span.end();
+ res.setException(
+ SpannerExceptionFactory.newSpannerException(
+ e.getCause() == null ? e : e.getCause()));
+ } catch (InterruptedException e) {
+ span.setStatus(e);
+ span.end();
+ res.setException(SpannerExceptionFactory.propagateInterrupt(e));
+ } catch (Exception e) {
+ span.setStatus(e);
+ span.end();
+ res.setException(e);
+ }
+ },
MoreExecutors.directExecutor());
return res;
}
@@ -424,6 +424,7 @@ TransactionContextImpl newTransaction(Options options) {
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setSpan(currentSpan)
+ .setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.setClock(poolMaintainerClock == null ? new Clock() : poolMaintainerClock)
.build();
@@ -446,4 +447,8 @@ T setActive(@Nullable T ctx) {
boolean hasReadyTransaction() {
return readyTransactionId != null;
}
+
+ TraceWrapper getTracer() {
+ return tracer;
+ }
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
index 54a0a292cd8..cc24dd2ba0f 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
@@ -22,17 +22,21 @@
import static com.google.cloud.spanner.MetricRegistryConstants.MAX_ALLOWED_SESSIONS_DESCRIPTION;
import static com.google.cloud.spanner.MetricRegistryConstants.MAX_IN_USE_SESSIONS;
import static com.google.cloud.spanner.MetricRegistryConstants.MAX_IN_USE_SESSIONS_DESCRIPTION;
+import static com.google.cloud.spanner.MetricRegistryConstants.METRIC_PREFIX;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_ACQUIRED_SESSIONS;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_ACQUIRED_SESSIONS_DESCRIPTION;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_IN_USE_SESSIONS;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_READ_SESSIONS;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_RELEASED_SESSIONS;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_RELEASED_SESSIONS_DESCRIPTION;
+import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_AVAILABLE;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_BEING_PREPARED;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_IN_POOL;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_IN_POOL_DESCRIPTION;
+import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_IN_USE;
import static com.google.cloud.spanner.MetricRegistryConstants.NUM_WRITE_SESSIONS;
import static com.google.cloud.spanner.MetricRegistryConstants.SESSIONS_TIMEOUTS_DESCRIPTION;
+import static com.google.cloud.spanner.MetricRegistryConstants.SESSIONS_TYPE;
import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_DEFAULT_LABEL_VALUES;
import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS;
import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS_WITH_TYPE;
@@ -62,7 +66,6 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
@@ -71,20 +74,16 @@
import com.google.protobuf.Empty;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.ResultSetStats;
-import io.opencensus.common.Scope;
import io.opencensus.metrics.DerivedLongCumulative;
import io.opencensus.metrics.DerivedLongGauge;
import io.opencensus.metrics.LabelValue;
import io.opencensus.metrics.MetricOptions;
import io.opencensus.metrics.MetricRegistry;
import io.opencensus.metrics.Metrics;
-import io.opencensus.trace.Annotation;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.BlankSpan;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Status;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -117,7 +116,7 @@
class SessionPool {
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
- private static final Tracer tracer = Tracing.getTracer();
+ private final TraceWrapper tracer;
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
/**
@@ -1094,7 +1093,7 @@ private enum SessionState {
}
private PooledSessionFuture createPooledSessionFuture(
- ListenableFuture future, Span span) {
+ ListenableFuture future, ISpan span) {
return new PooledSessionFuture(future, span);
}
@@ -1103,10 +1102,10 @@ class PooledSessionFuture extends SimpleForwardingListenableFuture delegate, Span span) {
+ PooledSessionFuture(ListenableFuture delegate, ISpan span) {
super(delegate);
this.span = span;
}
@@ -1338,7 +1337,7 @@ PooledSession get(final boolean eligibleForLongRunning) {
}
if (res != null) {
res.markBusy(span);
- span.addAnnotation(sessionAnnotation(res));
+ span.addAnnotation("Using Session", "sessionId", res.getName());
synchronized (lock) {
incrementNumSessionsInUse();
checkedOutSessions.add(this);
@@ -1572,8 +1571,8 @@ public void prepareReadWriteTransaction() {
private void keepAlive() {
markUsed();
- final Span previousSpan = delegate.getCurrentSpan();
- delegate.setCurrentSpan(BlankSpan.INSTANCE);
+ final ISpan previousSpan = delegate.getCurrentSpan();
+ delegate.setCurrentSpan(tracer.getBlankSpan());
try (ResultSet resultSet =
delegate
.singleUse(TimestampBound.ofMaxStaleness(60, TimeUnit.SECONDS))
@@ -1612,7 +1611,7 @@ private Dialect determineDialect() {
}
}
- private void markBusy(Span span) {
+ private void markBusy(ISpan span) {
this.delegate.setCurrentSpan(span);
this.state = SessionState.BUSY;
}
@@ -1652,14 +1651,14 @@ private void put(SpannerException e) {
public PooledSession get() {
long currentTimeout = options.getInitialWaitForSessionTimeoutMillis();
while (true) {
- Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan();
- try (Scope waitScope = tracer.withSpan(span)) {
+ ISpan span = tracer.spanBuilder(WAIT_FOR_SESSION);
+ try (IScope waitScope = tracer.withSpan(span)) {
PooledSession s =
pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout());
if (s == null) {
// Set the status to DEADLINE_EXCEEDED and retry.
numWaiterTimeouts.incrementAndGet();
- tracer.getCurrentSpan().setStatus(Status.DEADLINE_EXCEEDED);
+ tracer.getCurrentSpan().setStatus(ErrorCode.DEADLINE_EXCEEDED);
currentTimeout = Math.min(currentTimeout * 2, MAX_SESSION_WAIT_TIMEOUT);
} else {
return s;
@@ -1668,12 +1667,12 @@ public PooledSession get() {
if (e instanceof SpannerException
&& ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) {
numWaiterTimeouts.incrementAndGet();
- tracer.getCurrentSpan().setStatus(Status.RESOURCE_EXHAUSTED);
+ tracer.getCurrentSpan().setStatus(ErrorCode.RESOURCE_EXHAUSTED);
}
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw e;
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
}
@@ -2081,7 +2080,11 @@ enum Position {
* be created.
*/
static SessionPool createPool(
- SpannerOptions spannerOptions, SessionClient sessionClient, List labelValues) {
+ SpannerOptions spannerOptions,
+ SessionClient sessionClient,
+ TraceWrapper tracer,
+ List labelValues,
+ Attributes attributes) {
final SessionPoolOptions sessionPoolOptions = spannerOptions.getSessionPoolOptions();
// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
@@ -2094,14 +2097,26 @@ static SessionPool createPool(
poolMaintainerClock == null ? new Clock() : poolMaintainerClock,
Position.RANDOM,
Metrics.getMetricRegistry(),
- labelValues);
+ tracer,
+ labelValues,
+ spannerOptions.getOpenTelemetry(),
+ attributes);
}
static SessionPool createPool(
SessionPoolOptions poolOptions,
ExecutorFactory executorFactory,
- SessionClient sessionClient) {
- return createPool(poolOptions, executorFactory, sessionClient, new Clock(), Position.RANDOM);
+ SessionClient sessionClient,
+ TraceWrapper tracer,
+ OpenTelemetry openTelemetry) {
+ return createPool(
+ poolOptions,
+ executorFactory,
+ sessionClient,
+ new Clock(),
+ Position.RANDOM,
+ tracer,
+ openTelemetry);
}
static SessionPool createPool(
@@ -2109,7 +2124,9 @@ static SessionPool createPool(
ExecutorFactory executorFactory,
SessionClient sessionClient,
Clock clock,
- Position initialReleasePosition) {
+ Position initialReleasePosition,
+ TraceWrapper tracer,
+ OpenTelemetry openTelemetry) {
return createPool(
poolOptions,
null,
@@ -2118,7 +2135,10 @@ static SessionPool createPool(
clock,
initialReleasePosition,
Metrics.getMetricRegistry(),
- SPANNER_DEFAULT_LABEL_VALUES);
+ tracer,
+ SPANNER_DEFAULT_LABEL_VALUES,
+ openTelemetry,
+ null);
}
static SessionPool createPool(
@@ -2129,7 +2149,10 @@ static SessionPool createPool(
Clock clock,
Position initialReleasePosition,
MetricRegistry metricRegistry,
- List labelValues) {
+ TraceWrapper tracer,
+ List labelValues,
+ OpenTelemetry openTelemetry,
+ Attributes attributes) {
SessionPool pool =
new SessionPool(
poolOptions,
@@ -2140,7 +2163,10 @@ static SessionPool createPool(
clock,
initialReleasePosition,
metricRegistry,
- labelValues);
+ tracer,
+ labelValues,
+ openTelemetry,
+ attributes);
pool.initPool();
return pool;
}
@@ -2154,7 +2180,10 @@ private SessionPool(
Clock clock,
Position initialReleasePosition,
MetricRegistry metricRegistry,
- List labelValues) {
+ TraceWrapper tracer,
+ List labelValues,
+ OpenTelemetry openTelemetry,
+ Attributes attributes) {
this.options = options;
this.databaseRole = databaseRole;
this.executorFactory = executorFactory;
@@ -2163,7 +2192,9 @@ private SessionPool(
this.clock = clock;
this.initialReleasePosition = initialReleasePosition;
this.poolMaintainer = new PoolMaintainer();
- this.initMetricsCollection(metricRegistry, labelValues);
+ this.tracer = tracer;
+ this.initOpenCensusMetricsCollection(metricRegistry, labelValues);
+ this.initOpenTelemetryMetricsCollection(openTelemetry, attributes);
this.waitOnMinSessionsLatch =
options.getMinSessions() > 0 ? new CountDownLatch(1) : new CountDownLatch(0);
}
@@ -2351,7 +2382,7 @@ boolean isValid() {
*
*/
PooledSessionFuture getSession() throws SpannerException {
- Span span = Tracing.getTracer().getCurrentSpan();
+ ISpan span = tracer.getCurrentSpan();
span.addAnnotation("Acquiring session");
WaiterFuture waiter = null;
PooledSession sess = null;
@@ -2383,7 +2414,7 @@ PooledSessionFuture getSession() throws SpannerException {
}
private PooledSessionFuture checkoutSession(
- final Span span, final PooledSession readySession, WaiterFuture waiter) {
+ final ISpan span, final PooledSession readySession, WaiterFuture waiter) {
ListenableFuture sessionFuture;
if (waiter != null) {
logger.log(
@@ -2416,12 +2447,6 @@ PooledSessionFuture replaceSession(SessionNotFoundException e, PooledSessionFutu
}
}
- private Annotation sessionAnnotation(Session session) {
- AttributeValue sessionId = AttributeValue.stringAttributeValue(session.getName());
- return Annotation.fromDescriptionAndAttributes(
- "Using Session", ImmutableMap.of("sessionId", sessionId));
- }
-
private void incrementNumSessionsInUse() {
synchronized (lock) {
if (maxSessionsInUse < ++numSessionsInUse) {
@@ -2432,7 +2457,7 @@ private void incrementNumSessionsInUse() {
}
private void maybeCreateSession() {
- Span span = Tracing.getTracer().getCurrentSpan();
+ ISpan span = tracer.getCurrentSpan();
synchronized (lock) {
if (numWaiters() >= numSessionsBeingCreated) {
if (canCreateSession()) {
@@ -2799,13 +2824,17 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
}
/**
- * Initializes and creates Spanner session relevant metrics. When coupled with an exporter, it
- * allows users to monitor client behavior.
+ * Initializes and creates Spanner session relevant metrics using OpenCensus. When coupled with an
+ * exporter, it allows users to monitor client behavior.
*/
- private void initMetricsCollection(MetricRegistry metricRegistry, List labelValues) {
+ private void initOpenCensusMetricsCollection(
+ MetricRegistry metricRegistry, List labelValues) {
+ if (!SpannerOptions.isEnabledOpenCensusMetrics()) {
+ return;
+ }
DerivedLongGauge maxInUseSessionsMetric =
metricRegistry.addDerivedLongGauge(
- MAX_IN_USE_SESSIONS,
+ METRIC_PREFIX + MAX_IN_USE_SESSIONS,
MetricOptions.builder()
.setDescription(MAX_IN_USE_SESSIONS_DESCRIPTION)
.setUnit(COUNT)
@@ -2814,7 +2843,7 @@ private void initMetricsCollection(MetricRegistry metricRegistry, List 0L);
}
+
+ /**
+ * Initializes and creates Spanner session relevant metrics using OpenTelemetry. When coupled with
+ * an exporter, it allows users to monitor client behavior.
+ */
+ private void initOpenTelemetryMetricsCollection(
+ OpenTelemetry openTelemetry, Attributes attributes) {
+ if (openTelemetry == null || !SpannerOptions.isEnabledOpenTelemetryMetrics()) {
+ return;
+ }
+
+ Meter meter = openTelemetry.getMeter(MetricRegistryConstants.INSTRUMENTATION_SCOPE);
+ meter
+ .gaugeBuilder(MAX_ALLOWED_SESSIONS)
+ .setDescription(MAX_ALLOWED_SESSIONS_DESCRIPTION)
+ .setUnit(COUNT)
+ .buildWithCallback(
+ measurement -> {
+ // Although Max sessions is a constant value, OpenTelemetry requires to define this as
+ // a callback.
+ measurement.record(options.getMaxSessions(), attributes);
+ });
+
+ meter
+ .gaugeBuilder(MAX_IN_USE_SESSIONS)
+ .setDescription(MAX_IN_USE_SESSIONS_DESCRIPTION)
+ .setUnit(COUNT)
+ .buildWithCallback(
+ measurement -> {
+ measurement.record(this.maxSessionsInUse, attributes);
+ });
+
+ AttributesBuilder attributesBuilder;
+ if (attributes != null) {
+ attributesBuilder = attributes.toBuilder();
+ } else {
+ attributesBuilder = Attributes.builder();
+ }
+ Attributes attributesInUseSessions =
+ attributesBuilder.put(SESSIONS_TYPE, NUM_SESSIONS_IN_USE).build();
+ Attributes attributesAvailableSessions =
+ attributesBuilder.put(SESSIONS_TYPE, NUM_SESSIONS_AVAILABLE).build();
+ meter
+ .upDownCounterBuilder(NUM_SESSIONS_IN_POOL)
+ .setDescription(NUM_SESSIONS_IN_POOL_DESCRIPTION)
+ .setUnit(COUNT)
+ .buildWithCallback(
+ measurement -> {
+ measurement.record(this.numSessionsInUse, attributesInUseSessions);
+ measurement.record(this.sessions.size(), attributesAvailableSessions);
+ });
+
+ meter
+ .counterBuilder(GET_SESSION_TIMEOUTS)
+ .setDescription(SESSIONS_TIMEOUTS_DESCRIPTION)
+ .setUnit(COUNT)
+ .buildWithCallback(
+ measurement -> {
+ measurement.record(this.getNumWaiterTimeouts(), attributes);
+ });
+
+ meter
+ .counterBuilder(NUM_ACQUIRED_SESSIONS)
+ .setDescription(NUM_ACQUIRED_SESSIONS_DESCRIPTION)
+ .setUnit(COUNT)
+ .buildWithCallback(
+ measurement -> {
+ measurement.record(this.numSessionsAcquired, attributes);
+ });
+
+ meter
+ .counterBuilder(NUM_RELEASED_SESSIONS)
+ .setDescription(NUM_RELEASED_SESSIONS_DESCRIPTION)
+ .setUnit(COUNT)
+ .buildWithCallback(
+ measurement -> {
+ measurement.record(this.numSessionsReleased, attributes);
+ });
+ }
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
index f1a7888c62c..326a51d803e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
@@ -37,8 +37,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import io.opencensus.metrics.LabelValue;
-import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -55,7 +56,14 @@
/** Default implementation of the Cloud Spanner interface. */
class SpannerImpl extends BaseService implements Spanner {
private static final Logger logger = Logger.getLogger(SpannerImpl.class.getName());
- static final Tracer tracer = Tracing.getTracer();
+ final TraceWrapper tracer =
+ new TraceWrapper(
+ Tracing.getTracer(),
+ this.getOptions()
+ .getOpenTelemetry()
+ .getTracer(
+ MetricRegistryConstants.INSTRUMENTATION_SCOPE,
+ GaxProperties.getLibraryVersion(this.getOptions().getClass())));
static final String CREATE_SESSION = "CloudSpannerOperation.CreateSession";
static final String BATCH_CREATE_SESSIONS = "CloudSpannerOperation.BatchCreateSessions";
@@ -148,6 +156,10 @@ QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
return getOptions().getDefaultQueryOptions(databaseId);
}
+ TraceWrapper getTracer() {
+ return this.tracer;
+ }
+
/**
* Returns the {@link ExecutorProvider} to use for async methods that need a background executor.
*/
@@ -219,9 +231,19 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
LabelValue.create(db.getDatabase()),
LabelValue.create(db.getInstanceId().getName()),
LabelValue.create(GaxProperties.getLibraryVersion(getOptions().getClass())));
+
+ AttributesBuilder attributesBuilder = Attributes.builder();
+ attributesBuilder.put("client_id", clientId);
+ attributesBuilder.put("database", db.getDatabase());
+ attributesBuilder.put("instance_id", db.getInstanceId().getName());
+
SessionPool pool =
SessionPool.createPool(
- getOptions(), SpannerImpl.this.getSessionClient(db), labelValues);
+ getOptions(),
+ SpannerImpl.this.getSessionClient(db),
+ this.tracer,
+ labelValues,
+ attributesBuilder.build());
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient = createDatabaseClient(clientId, pool);
dbClients.put(db, dbClient);
@@ -232,7 +254,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
@VisibleForTesting
DatabaseClientImpl createDatabaseClient(String clientId, SessionPool pool) {
- return new DatabaseClientImpl(clientId, pool);
+ return new DatabaseClientImpl(clientId, pool, tracer);
}
@Override
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
index 0d9f3b85c19..9c6044aa938 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
@@ -19,6 +19,7 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
+import com.google.api.core.ObsoleteApi;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
@@ -61,6 +62,8 @@
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
@@ -75,12 +78,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;
/** Options for the Cloud Spanner service. */
public class SpannerOptions extends ServiceOptions {
private static final long serialVersionUID = 2789571558532701170L;
private static SpannerEnvironment environment = SpannerEnvironmentImpl.INSTANCE;
+ private static boolean enableOpenCensusMetrics = true;
+ private static boolean enableOpenTelemetryMetrics = false;
private static final String JDBC_API_CLIENT_LIB_TOKEN = "sp-jdbc";
private static final String HIBERNATE_API_CLIENT_LIB_TOKEN = "sp-hib";
@@ -141,6 +147,17 @@ public class SpannerOptions extends ServiceOptions {
private final boolean attemptDirectPath;
private final DirectedReadOptions directedReadOptions;
private final boolean useVirtualThreads;
+ private final OpenTelemetry openTelemetry;
+
+ enum TracingFramework {
+ OPEN_CENSUS,
+ OPEN_TELEMETRY
+ }
+
+ private static final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private static TracingFramework activeTracingFramework;
/** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */
public interface CallCredentialsProvider {
@@ -633,6 +650,7 @@ protected SpannerOptions(Builder builder) {
attemptDirectPath = builder.attemptDirectPath;
directedReadOptions = builder.directedReadOptions;
useVirtualThreads = builder.useVirtualThreads;
+ openTelemetry = builder.openTelemetry;
}
/**
@@ -737,6 +755,7 @@ public static class Builder
private boolean attemptDirectPath = true;
private DirectedReadOptions directedReadOptions;
private boolean useVirtualThreads = false;
+ private OpenTelemetry openTelemetry;
private static String createCustomClientLibToken(String token) {
return token + " " + ServiceOptions.getGoogApiClientLibName();
@@ -1243,6 +1262,15 @@ public Builder setEmulatorHost(String emulatorHost) {
return this;
}
+ /**
+ * Sets OpenTelemetry object to be used for Spanner Metrics and Traces. GlobalOpenTelemetry will
+ * be used as fallback if this options is not set.
+ */
+ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
+ this.openTelemetry = openTelemetry;
+ return this;
+ }
+
/**
* Enable leader aware routing. Leader aware routing would route all requests in RW/PDML
* transactions to the leader region.
@@ -1297,6 +1325,11 @@ public SpannerOptions build() {
this.grpcGcpExtensionEnabled ? GRPC_GCP_ENABLED_DEFAULT_CHANNELS : DEFAULT_CHANNELS;
}
+ synchronized (lock) {
+ if (activeTracingFramework == null) {
+ activeTracingFramework = TracingFramework.OPEN_CENSUS;
+ }
+ }
return new SpannerOptions(this);
}
}
@@ -1326,6 +1359,77 @@ public static void useDefaultEnvironment() {
SpannerOptions.environment = SpannerEnvironmentImpl.INSTANCE;
}
+ /**
+ * Enables OpenTelemetry traces. Enabling OpenTelemetry traces will disable OpenCensus traces. By
+ * default, OpenCensus traces are enabled.
+ */
+ public static void enableOpenTelemetryTraces() {
+ synchronized (lock) {
+ if (activeTracingFramework != null
+ && activeTracingFramework != TracingFramework.OPEN_TELEMETRY) {
+ throw new IllegalStateException(
+ "ActiveTracingFramework is set to OpenCensus and cannot be reset after SpannerOptions object is created.");
+ }
+ activeTracingFramework = TracingFramework.OPEN_TELEMETRY;
+ }
+ }
+
+ /** Enables OpenCensus traces. Enabling OpenCensus traces will disable OpenTelemetry traces. */
+ @ObsoleteApi(
+ "The OpenCensus project is deprecated. Use enableOpenTelemetryTraces to switch to OpenTelemetry traces")
+ public static void enableOpenCensusTraces() {
+ synchronized (lock) {
+ if (activeTracingFramework != null
+ && activeTracingFramework != TracingFramework.OPEN_CENSUS) {
+ throw new IllegalStateException(
+ "ActiveTracingFramework is set to OpenTelemetry and cannot be reset after SpannerOptions object is created.");
+ }
+ activeTracingFramework = TracingFramework.OPEN_CENSUS;
+ }
+ }
+
+ /**
+ * Always resets the activeTracingFramework. This variable is used for internal testing, and is
+ * not a valid production scenario
+ */
+ @ObsoleteApi(
+ "The OpenCensus project is deprecated. Use enableOpenTelemetryTraces to switch to OpenTelemetry traces")
+ static void resetActiveTracingFramework() {
+ activeTracingFramework = null;
+ }
+
+ public static TracingFramework getActiveTracingFramework() {
+ synchronized (lock) {
+ if (activeTracingFramework == null) {
+ return TracingFramework.OPEN_CENSUS;
+ }
+ return activeTracingFramework;
+ }
+ }
+
+ /** Disables OpenCensus metrics. Disable OpenCensus metrics before creating Spanner client. */
+ public static void disableOpenCensusMetrics() {
+ SpannerOptions.enableOpenCensusMetrics = false;
+ }
+
+ @VisibleForTesting
+ static void enableOpenCensusMetrics() {
+ SpannerOptions.enableOpenCensusMetrics = true;
+ }
+
+ public static boolean isEnabledOpenCensusMetrics() {
+ return SpannerOptions.enableOpenCensusMetrics;
+ }
+
+ /** Enables OpenTelemetry metrics. Enable OpenTelemetry metrics before creating Spanner client. */
+ public static void enableOpenTelemetryMetrics() {
+ SpannerOptions.enableOpenTelemetryMetrics = true;
+ }
+
+ public static boolean isEnabledOpenTelemetryMetrics() {
+ return SpannerOptions.enableOpenTelemetryMetrics;
+ }
+
@Override
protected String getDefaultProject() {
String projectId = getDefaultProjectId();
@@ -1426,6 +1530,18 @@ public boolean isAttemptDirectPath() {
return attemptDirectPath;
}
+ /**
+ * Returns an instance of OpenTelemetry. If OpenTelemetry object is not set via SpannerOptions
+ * then GlobalOpenTelemetry will be used as fallback.
+ */
+ public OpenTelemetry getOpenTelemetry() {
+ if (this.openTelemetry != null) {
+ return this.openTelemetry;
+ } else {
+ return GlobalOpenTelemetry.get();
+ }
+ }
+
@BetaApi
public boolean isUseVirtualThreads() {
return useVirtualThreads;
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerRpcMetrics.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerRpcMetrics.java
new file mode 100644
index 00000000000..794c211971d
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerRpcMetrics.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.spanner;
+
+import com.google.api.core.InternalApi;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import java.util.Arrays;
+import java.util.List;
+
+@InternalApi
+public class SpannerRpcMetrics {
+ private final LongHistogram gfeLatencies;
+ private final LongCounter gfeHeaderMissingCount;
+
+ public SpannerRpcMetrics(OpenTelemetry openTelemetry) {
+ if (!SpannerOptions.isEnabledOpenTelemetryMetrics()) {
+ gfeLatencies = null;
+ gfeHeaderMissingCount = null;
+ return;
+ }
+
+ Meter meter = openTelemetry.getMeter(MetricRegistryConstants.INSTRUMENTATION_SCOPE);
+ List RPC_MILLIS_BUCKET_BOUNDARIES =
+ Arrays.asList(
+ 1L, 2L, 3L, 4L, 5L, 6L, 8L, 10L, 13L, 16L, 20L, 25L, 30L, 40L, 50L, 65L, 80L, 100L,
+ 130L, 160L, 200L, 250L, 300L, 400L, 500L, 650L, 800L, 1000L, 2000L, 5000L, 10000L,
+ 20000L, 50000L, 100000L);
+ gfeLatencies =
+ meter
+ .histogramBuilder(MetricRegistryConstants.SPANNER_GFE_LATENCY)
+ .ofLongs()
+ .setDescription(MetricRegistryConstants.SPANNER_GFE_LATENCY_DESCRIPTION)
+ .setUnit("ms")
+ .setExplicitBucketBoundariesAdvice(RPC_MILLIS_BUCKET_BOUNDARIES)
+ .build();
+ gfeHeaderMissingCount =
+ meter
+ .counterBuilder(MetricRegistryConstants.SPANNER_GFE_HEADER_MISSING_COUNT)
+ .setDescription(MetricRegistryConstants.SPANNER_GFE_HEADER_MISSING_COUNT_DESCRIPTION)
+ .setUnit(MetricRegistryConstants.COUNT)
+ .build();
+ }
+
+ @InternalApi
+ public void recordGfeLatency(long value, Attributes attributes) {
+ if (gfeLatencies != null) {
+ gfeLatencies.record(value, attributes);
+ }
+ }
+
+ @InternalApi
+ public void recordGfeHeaderMissingCount(long value, Attributes attributes) {
+ if (gfeHeaderMissingCount != null) {
+ gfeHeaderMissingCount.add(value, attributes);
+ }
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java
deleted file mode 100644
index 0d429661ad2..00000000000
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2017 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.spanner;
-
-import com.google.cloud.Timestamp;
-import com.google.common.collect.ImmutableMap;
-import com.google.spanner.v1.Transaction;
-import io.opencensus.contrib.grpc.util.StatusConverter;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.EndSpanOptions;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Status;
-import java.util.Map;
-
-/** Utility methods for tracing. */
-class TraceUtil {
-
- static final EndSpanOptions END_SPAN_OPTIONS =
- EndSpanOptions.builder().setSampleToLocalSpanStore(true).build();
-
- static Map getTransactionAnnotations(Transaction t) {
- return ImmutableMap.of(
- "Id",
- AttributeValue.stringAttributeValue(t.getId().toStringUtf8()),
- "Timestamp",
- AttributeValue.stringAttributeValue(Timestamp.fromProto(t.getReadTimestamp()).toString()));
- }
-
- static ImmutableMap getExceptionAnnotations(Throwable e) {
- if (e instanceof SpannerException) {
- return ImmutableMap.of(
- "Status",
- AttributeValue.stringAttributeValue(((SpannerException) e).getErrorCode().toString()));
- }
- return ImmutableMap.of();
- }
-
- static ImmutableMap getExceptionAnnotations(SpannerException e) {
- return ImmutableMap.of(
- "Status", AttributeValue.stringAttributeValue(e.getErrorCode().toString()));
- }
-
- static void setWithFailure(Span span, Throwable e) {
- if (e instanceof SpannerException) {
- span.setStatus(
- StatusConverter.fromGrpcStatus(((SpannerException) e).getErrorCode().getGrpcStatus())
- .withDescription(e.getMessage()));
- } else {
- span.setStatus(Status.INTERNAL.withDescription(e.getMessage()));
- }
- }
-
- static void endSpanWithFailure(Span span, Throwable e) {
- if (e instanceof SpannerException) {
- endSpanWithFailure(span, (SpannerException) e);
- } else {
- span.setStatus(Status.INTERNAL.withDescription(e.getMessage()));
- span.end(END_SPAN_OPTIONS);
- }
- }
-
- static void endSpanWithFailure(Span span, SpannerException e) {
- span.setStatus(
- StatusConverter.fromGrpcStatus(e.getErrorCode().getGrpcStatus())
- .withDescription(e.getMessage()));
- span.end(END_SPAN_OPTIONS);
- }
-}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceWrapper.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceWrapper.java
new file mode 100644
index 00000000000..25796968e9e
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceWrapper.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.cloud.spanner.SpannerOptions.TracingFramework;
+import io.opencensus.trace.BlankSpan;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.Tracer;
+import io.opentelemetry.context.Context;
+
+class TraceWrapper {
+
+ private final Tracer openCensusTracer;
+ private final io.opentelemetry.api.trace.Tracer openTelemetryTracer;
+
+ TraceWrapper(Tracer openCensusTracer, io.opentelemetry.api.trace.Tracer openTelemetryTracer) {
+ this.openTelemetryTracer = openTelemetryTracer;
+ this.openCensusTracer = openCensusTracer;
+ }
+
+ ISpan spanBuilder(String spanName) {
+ if (SpannerOptions.getActiveTracingFramework().equals(TracingFramework.OPEN_TELEMETRY)) {
+ return new OpenTelemetrySpan(openTelemetryTracer.spanBuilder(spanName).startSpan());
+ } else {
+ return new OpenCensusSpan(openCensusTracer.spanBuilder(spanName).startSpan());
+ }
+ }
+
+ ISpan spanBuilderWithExplicitParent(String spanName, ISpan parentSpan) {
+ if (SpannerOptions.getActiveTracingFramework().equals(TracingFramework.OPEN_TELEMETRY)) {
+ OpenTelemetrySpan otParentSpan = (OpenTelemetrySpan) parentSpan;
+
+ io.opentelemetry.api.trace.Span otSpan;
+
+ if (otParentSpan != null && otParentSpan.getOpenTelemetrySpan() != null) {
+ otSpan =
+ openTelemetryTracer
+ .spanBuilder(spanName)
+ .setParent(Context.current().with(otParentSpan.getOpenTelemetrySpan()))
+ .startSpan();
+ } else {
+ otSpan = openTelemetryTracer.spanBuilder(spanName).startSpan();
+ }
+
+ return new OpenTelemetrySpan(otSpan);
+
+ } else {
+ OpenCensusSpan parentOcSpan = (OpenCensusSpan) parentSpan;
+ Span ocSpan =
+ openCensusTracer
+ .spanBuilderWithExplicitParent(
+ spanName, parentOcSpan != null ? parentOcSpan.getOpenCensusSpan() : null)
+ .startSpan();
+
+ return new OpenCensusSpan(ocSpan);
+ }
+ }
+
+ ISpan getCurrentSpan() {
+ if (SpannerOptions.getActiveTracingFramework().equals(TracingFramework.OPEN_TELEMETRY)) {
+ return new OpenTelemetrySpan(
+ io.opentelemetry.api.trace.Span.fromContext(io.opentelemetry.context.Context.current()));
+ } else {
+ return new OpenCensusSpan(openCensusTracer.getCurrentSpan());
+ }
+ }
+
+ ISpan getBlankSpan() {
+ if (SpannerOptions.getActiveTracingFramework().equals(TracingFramework.OPEN_TELEMETRY)) {
+ return new OpenTelemetrySpan(io.opentelemetry.api.trace.Span.getInvalid());
+ } else {
+ return new OpenCensusSpan(BlankSpan.INSTANCE);
+ }
+ }
+
+ IScope withSpan(ISpan span) {
+ if (SpannerOptions.getActiveTracingFramework().equals(TracingFramework.OPEN_TELEMETRY)) {
+ OpenTelemetrySpan openTelemetrySpan;
+ if (!(span instanceof OpenTelemetrySpan)) {
+ openTelemetrySpan = new OpenTelemetrySpan(null);
+ } else {
+ openTelemetrySpan = (OpenTelemetrySpan) span;
+ }
+ return new OpenTelemetryScope(openTelemetrySpan.getOpenTelemetrySpan().makeCurrent());
+ } else {
+ OpenCensusSpan openCensusSpan;
+ if (!(span instanceof OpenCensusSpan)) {
+ openCensusSpan = new OpenCensusSpan(null);
+ } else {
+ openCensusSpan = (OpenCensusSpan) span;
+ }
+ return new OpenCensusScope(openCensusTracer.withSpan(openCensusSpan.getOpenCensusSpan()));
+ }
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
index 2d47fcd5c78..95ffd1168b2 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
@@ -20,41 +20,39 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.common.base.Preconditions;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
/** Implementation of {@link TransactionManager}. */
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
- private static final Tracer tracer = Tracing.getTracer();
+ private final TraceWrapper tracer;
private final SessionImpl session;
- private Span span;
+ private ISpan span;
private final Options options;
private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
- TransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
+ TransactionManagerImpl(
+ SessionImpl session, ISpan span, TraceWrapper tracer, TransactionOption... options) {
this.session = session;
this.span = span;
+ this.tracer = tracer;
this.options = Options.fromTransactionOptions(options);
}
- Span getSpan() {
+ ISpan getSpan() {
return span;
}
@Override
- public void setSpan(Span span) {
+ public void setSpan(ISpan span) {
this.span = span;
}
@Override
public TransactionContext begin() {
Preconditions.checkState(txn == null, "begin can only be called once");
- try (Scope s = tracer.withSpan(span)) {
+ try (IScope s = tracer.withSpan(span)) {
txn = session.newTransaction(options);
session.setActive(this);
txnState = TransactionState.STARTED;
@@ -102,7 +100,7 @@ public TransactionContext resetForRetry() {
throw new IllegalStateException(
"resetForRetry can only be called if the previous attempt" + " aborted");
}
- try (Scope s = tracer.withSpan(span)) {
+ try (IScope s = tracer.withSpan(span)) {
boolean useInlinedBegin = txn.transactionId != null;
txn = session.newTransaction(options);
if (!useInlinedBegin) {
@@ -137,7 +135,7 @@ public void close() {
txnState = TransactionState.ROLLED_BACK;
}
} finally {
- span.end(TraceUtil.END_SPAN_OPTIONS);
+ span.end();
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
index 5a0cd3618e8..3249be1bdb3 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
@@ -49,11 +49,6 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
-import io.opencensus.common.Scope;
-import io.opencensus.trace.AttributeValue;
-import io.opencensus.trace.Span;
-import io.opencensus.trace.Tracer;
-import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@@ -71,8 +66,6 @@
/** Default implementation of {@link TransactionRunner}. */
class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
-
- private static final Tracer tracer = Tracing.getTracer();
private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName());
/**
* (Part of) the error message that is returned by Cloud Spanner if a transaction is cancelled
@@ -257,10 +250,7 @@ ApiFuture ensureTxnAsync() {
if (transactionId == null || isAborted()) {
createTxnAsync(res);
} else {
- span.addAnnotation(
- "Transaction Initialized",
- ImmutableMap.of(
- "Id", AttributeValue.stringAttributeValue(transactionId.toStringUtf8())));
+ span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8());
txnLogger.log(
Level.FINER,
"Using prepared transaction {0}",
@@ -277,10 +267,7 @@ private void createTxnAsync(final SettableApiFuture res) {
() -> {
try {
transactionId = fut.get();
- span.addAnnotation(
- "Transaction Creation Done",
- ImmutableMap.of(
- "Id", AttributeValue.stringAttributeValue(transactionId.toStringUtf8())));
+ span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8());
txnLogger.log(
Level.FINER,
"Started transaction {0}",
@@ -288,8 +275,7 @@ private void createTxnAsync(final SettableApiFuture res) {
res.set(null);
} catch (ExecutionException e) {
span.addAnnotation(
- "Transaction Creation Failed",
- TraceUtil.getExceptionAnnotations(e.getCause() == null ? e : e.getCause()));
+ "Transaction Creation Failed", e.getCause() == null ? e : e.getCause());
res.setException(e.getCause() == null ? e : e.getCause());
} catch (InterruptedException e) {
res.setException(SpannerExceptionFactory.propagateInterrupt(e));
@@ -405,39 +391,37 @@ public void run() {
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
- final Span opSpan =
- tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan();
+ final ISpan opSpan = tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span);
final ApiFuture commitFuture =
rpc.commitAsync(commitRequest, session.getOptions());
session.markUsed(clock.instant());
commitFuture.addListener(
- tracer.withSpan(
- opSpan,
- () -> {
- try {
- com.google.spanner.v1.CommitResponse proto = commitFuture.get();
- if (!proto.hasCommitTimestamp()) {
- throw newSpannerException(
- ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName());
- }
- span.addAnnotation("Commit Done");
- opSpan.end(TraceUtil.END_SPAN_OPTIONS);
- res.set(new CommitResponse(proto));
- } catch (Throwable e) {
- if (e instanceof ExecutionException) {
- e =
- SpannerExceptionFactory.newSpannerException(
- e.getCause() == null ? e : e.getCause());
- } else if (e instanceof InterruptedException) {
- e = SpannerExceptionFactory.propagateInterrupt((InterruptedException) e);
- } else {
- e = SpannerExceptionFactory.newSpannerException(e);
- }
- span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e));
- TraceUtil.endSpanWithFailure(opSpan, e);
- res.setException(onError((SpannerException) e, false));
- }
- }),
+ () -> {
+ try (IScope s = tracer.withSpan(opSpan)) {
+ com.google.spanner.v1.CommitResponse proto = commitFuture.get();
+ if (!proto.hasCommitTimestamp()) {
+ throw newSpannerException(
+ ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName());
+ }
+ span.addAnnotation("Commit Done");
+ opSpan.end();
+ res.set(new CommitResponse(proto));
+ } catch (Throwable e) {
+ if (e instanceof ExecutionException) {
+ e =
+ SpannerExceptionFactory.newSpannerException(
+ e.getCause() == null ? e : e.getCause());
+ } else if (e instanceof InterruptedException) {
+ e = SpannerExceptionFactory.propagateInterrupt((InterruptedException) e);
+ } else {
+ e = SpannerExceptionFactory.newSpannerException(e);
+ }
+ span.addAnnotation("Commit Failed", e);
+ opSpan.setStatus(e);
+ opSpan.end();
+ res.setException(onError((SpannerException) e, false));
+ }
+ },
MoreExecutors.directExecutor());
} catch (InterruptedException e) {
res.setException(SpannerExceptionFactory.propagateInterrupt(e));
@@ -446,6 +430,9 @@ public void run() {
} catch (ExecutionException e) {
res.setException(
SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()));
+ } catch (Throwable e) {
+ res.setException(
+ SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()));
}
}
}
@@ -466,7 +453,7 @@ void rollback() {
rollbackAsync().get();
} catch (ExecutionException e) {
txnLogger.log(Level.FINE, "Exception during rollback", e);
- span.addAnnotation("Rollback Failed", TraceUtil.getExceptionAnnotations(e));
+ span.addAnnotation("Rollback Failed", e);
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
@@ -962,7 +949,8 @@ public ListenableAsyncResultSet executeQueryAsync(
private boolean blockNestedTxn = true;
private final SessionImpl session;
private final Options options;
- private Span span;
+ private ISpan span;
+ private TraceWrapper tracer;
private TransactionContextImpl txn;
private volatile boolean isValid = true;
@@ -976,29 +964,31 @@ public TransactionRunner allowNestedTransaction() {
this.session = session;
this.options = Options.fromTransactionOptions(options);
this.txn = session.newTransaction(this.options);
+ this.tracer = session.getTracer();
}
@Override
- public void setSpan(Span span) {
+ public void setSpan(ISpan span) {
this.span = span;
}
@Nullable
@Override
public T run(TransactionCallable callable) {
- try (Scope s = tracer.withSpan(span)) {
+ try (IScope s = tracer.withSpan(span)) {
if (blockNestedTxn) {
SessionImpl.hasPendingTransaction.set(Boolean.TRUE);
}
return runInternal(callable);
} catch (RuntimeException e) {
- TraceUtil.setWithFailure(span, e);
+ span.setStatus(e);
throw e;
} finally {
// Remove threadLocal rather than set to FALSE to avoid a possible memory leak.
// We also do this unconditionally in case a user has modified the flag when the transaction
// was running.
SessionImpl.hasPendingTransaction.remove();
+ span.end();
}
}
@@ -1016,9 +1006,7 @@ private T runInternal(final TransactionCallable txCallable) {
checkState(
isValid, "TransactionRunner has been invalidated by a new operation on the session");
attempt.incrementAndGet();
- span.addAnnotation(
- "Starting Transaction Attempt",
- ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt.longValue())));
+ span.addAnnotation("Starting Transaction Attempt", "Attempt", attempt.longValue());
// Only ensure that there is a transaction if we should not inline the beginTransaction
// with the first statement.
if (!useInlinedBegin) {
@@ -1035,8 +1023,8 @@ private T runInternal(final TransactionCallable txCallable) {
if (txn.isAborted() || (e instanceof AbortedException)) {
span.addAnnotation(
"Transaction Attempt Aborted in user operation. Retrying",
- ImmutableMap.of(
- "Attempt", AttributeValue.longAttributeValue(attempt.longValue())));
+ "Attempt",
+ attempt.longValue());
shouldRollback = false;
if (e instanceof AbortedException) {
throw e;
@@ -1052,10 +1040,8 @@ private T runInternal(final TransactionCallable txCallable) {
}
span.addAnnotation(
"Transaction Attempt Failed in user operation",
- ImmutableMap.builder()
- .putAll(TraceUtil.getExceptionAnnotations(toThrow))
- .put("Attempt", AttributeValue.longAttributeValue(attempt.longValue()))
- .build());
+ ImmutableMap.of(
+ "Attempt", attempt.longValue(), "Status", toThrow.getErrorCode().toString()));
throw toThrow;
} finally {
if (shouldRollback) {
@@ -1065,23 +1051,18 @@ private T runInternal(final TransactionCallable txCallable) {
try {
txn.commit();
- span.addAnnotation(
- "Transaction Attempt Succeeded",
- ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt.longValue())));
+ span.addAnnotation("Transaction Attempt Succeeded", "Attempt", attempt.longValue());
return result;
} catch (AbortedException e) {
txnLogger.log(Level.FINE, "Commit aborted", e);
span.addAnnotation(
- "Transaction Attempt Aborted in Commit. Retrying",
- ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt.longValue())));
+ "Transaction Attempt Aborted in Commit. Retrying", "Attempt", attempt.longValue());
throw e;
} catch (SpannerException e) {
span.addAnnotation(
"Transaction Attempt Failed in Commit",
- ImmutableMap.builder()
- .putAll(TraceUtil.getExceptionAnnotations(e))
- .put("Attempt", AttributeValue.longAttributeValue(attempt.longValue()))
- .build());
+ ImmutableMap.of(
+ "Attempt", attempt.longValue(), "Status", e.getErrorCode().toString()));
throw e;
}
};
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
index a8618af5c79..c9aa5987663 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
@@ -339,7 +339,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
- SpannerInterceptorProvider.createDefault()))
+ SpannerInterceptorProvider.createDefault(options.getOpenTelemetry())))
// This sets the response compressor (Server -> Client).
.withEncoding(compressorName))
.setHeaderProvider(headerProviderWithUserAgent)
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
index d4725b25427..7de63dc33ba 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
@@ -22,6 +22,7 @@
import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT;
import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_LATENCY;
+import com.google.cloud.spanner.SpannerRpcMetrics;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -37,6 +38,8 @@
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
@@ -63,8 +66,23 @@ class HeaderInterceptor implements ClientInterceptor {
private static final Logger LOGGER = Logger.getLogger(HeaderInterceptor.class.getName());
private static final Level LEVEL = Level.INFO;
+ private final SpannerRpcMetrics spannerRpcMetrics;
- HeaderInterceptor() {}
+ HeaderInterceptor(SpannerRpcMetrics spannerRpcMetrics) {
+ this.spannerRpcMetrics = spannerRpcMetrics;
+ }
+
+ private class SpannerProperties {
+ String projectId;
+ String instanceId;
+ String databaseId;
+
+ SpannerProperties(String projectId, String instanceId, String databaseId) {
+ this.databaseId = databaseId;
+ this.instanceId = instanceId;
+ this.projectId = projectId;
+ }
+ }
@Override
public ClientCall interceptCall(
@@ -72,13 +90,14 @@ public ClientCall interceptCall(
return new SimpleForwardingClientCall(next.newCall(method, callOptions)) {
@Override
public void start(Listener responseListener, Metadata headers) {
- TagContext tagContext = getTagContext(headers, method.getFullMethodName());
+ SpannerProperties spannerProperties = createProjectPropertes(headers);
+ TagContext tagContext = getTagContext(method.getFullMethodName(), spannerProperties);
+ Attributes attributes = getMetricAttributes(method.getFullMethodName(), spannerProperties);
super.start(
new SimpleForwardingClientCallListener(responseListener) {
@Override
public void onHeaders(Metadata metadata) {
-
- processHeader(metadata, tagContext);
+ processHeader(metadata, tagContext, attributes);
super.onHeaders(metadata);
}
},
@@ -87,7 +106,7 @@ public void onHeaders(Metadata metadata) {
};
}
- private void processHeader(Metadata metadata, TagContext tagContext) {
+ private void processHeader(Metadata metadata, TagContext tagContext, Attributes attributes) {
MeasureMap measureMap = STATS_RECORDER.newMeasureMap();
if (metadata.get(SERVER_TIMING_HEADER_KEY) != null) {
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
@@ -98,27 +117,20 @@ private void processHeader(Metadata metadata, TagContext tagContext) {
measureMap.put(SPANNER_GFE_LATENCY, latency);
measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 0L);
measureMap.record(tagContext);
+
+ spannerRpcMetrics.recordGfeLatency(latency, attributes);
+ spannerRpcMetrics.recordGfeHeaderMissingCount(0L, attributes);
} catch (NumberFormatException e) {
LOGGER.log(LEVEL, "Invalid server-timing object in header", matcher.group("dur"));
}
}
} else {
+ spannerRpcMetrics.recordGfeHeaderMissingCount(1L, attributes);
measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L).record(tagContext);
}
}
- private TagContext getTagContext(
- String method, String projectId, String instanceId, String databaseId) {
- return TAGGER
- .currentBuilder()
- .putLocal(PROJECT_ID, TagValue.create(projectId))
- .putLocal(INSTANCE_ID, TagValue.create(instanceId))
- .putLocal(DATABASE_ID, TagValue.create(databaseId))
- .putLocal(METHOD, TagValue.create(method))
- .build();
- }
-
- private TagContext getTagContext(Metadata headers, String method) {
+ private SpannerProperties createProjectPropertes(Metadata headers) {
String projectId = "undefined-project";
String instanceId = "undefined-database";
String databaseId = "undefined-database";
@@ -137,6 +149,26 @@ private TagContext getTagContext(Metadata headers, String method) {
LOGGER.log(LEVEL, "Error parsing google cloud resource header: " + googleResourcePrefix);
}
}
- return getTagContext(method, projectId, instanceId, databaseId);
+ return new SpannerProperties(projectId, instanceId, databaseId);
+ }
+
+ private TagContext getTagContext(String method, SpannerProperties spannerProperties) {
+ return TAGGER
+ .currentBuilder()
+ .putLocal(PROJECT_ID, TagValue.create(spannerProperties.projectId))
+ .putLocal(INSTANCE_ID, TagValue.create(spannerProperties.instanceId))
+ .putLocal(DATABASE_ID, TagValue.create(spannerProperties.databaseId))
+ .putLocal(METHOD, TagValue.create(method))
+ .build();
+ }
+
+ private Attributes getMetricAttributes(String method, SpannerProperties spannerProperties) {
+ AttributesBuilder attributesBuilder = Attributes.builder();
+ attributesBuilder.put("database", spannerProperties.databaseId);
+ attributesBuilder.put("instance_id", spannerProperties.instanceId);
+ attributesBuilder.put("project_id", spannerProperties.projectId);
+ attributesBuilder.put("method", method);
+
+ return attributesBuilder.build();
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java
index e45702de8df..9b1a2fd3c1f 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java
@@ -16,9 +16,14 @@
package com.google.cloud.spanner.spi.v1;
import com.google.api.core.InternalApi;
+import com.google.api.core.ObsoleteApi;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
+import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -29,21 +34,24 @@
*/
@InternalApi("Exposed for testing")
public class SpannerInterceptorProvider implements GrpcInterceptorProvider {
-
- private static final List defaultInterceptors =
- ImmutableList.of(
- new SpannerErrorInterceptor(),
- new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER),
- new HeaderInterceptor());
-
private final List clientInterceptors;
private SpannerInterceptorProvider(List clientInterceptors) {
this.clientInterceptors = clientInterceptors;
}
+ @ObsoleteApi("This method always uses Global OpenTelemetry")
public static SpannerInterceptorProvider createDefault() {
- return new SpannerInterceptorProvider(defaultInterceptors);
+ return createDefault(GlobalOpenTelemetry.get());
+ }
+
+ public static SpannerInterceptorProvider createDefault(OpenTelemetry openTelemetry) {
+ List defaultInterceptorList = new ArrayList<>();
+ defaultInterceptorList.add(new SpannerErrorInterceptor());
+ defaultInterceptorList.add(
+ new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER));
+ defaultInterceptorList.add(new HeaderInterceptor(new SpannerRpcMetrics(openTelemetry)));
+ return new SpannerInterceptorProvider(ImmutableList.copyOf(defaultInterceptorList));
}
static SpannerInterceptorProvider create(GrpcInterceptorProvider provider) {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java
index b1b68e95d1a..7d6cc163b46 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpcViews.java
@@ -15,6 +15,8 @@
*/
package com.google.cloud.spanner.spi.v1;
+import com.google.api.core.ObsoleteApi;
+import com.google.cloud.spanner.SpannerOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.opencensus.stats.Aggregation;
@@ -91,28 +93,49 @@ public class SpannerRpcViews {
* measures the latency between Google's network receives an RPC and reads back the first byte of
* the response. gfe_header_missing_count is a counter of the number of RPC responses without a
* server-timing header.
+ *
+ * @deprecated The OpenCensus project is deprecated. Use OpenTelemetry to get gfe_latency and
+ * gfe_header_missing_count metrics.
*/
@VisibleForTesting
+ @ObsoleteApi(
+ "The OpenCensus project is deprecated. Use OpenTelemetry to get gfe_latency and gfe_header_missing_count metrics.")
public static void registerGfeLatencyAndHeaderMissingCountViews() {
- viewManager.registerView(SPANNER_GFE_LATENCY_VIEW);
- viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW);
+ if (SpannerOptions.isEnabledOpenCensusMetrics()) {
+ viewManager.registerView(SPANNER_GFE_LATENCY_VIEW);
+ viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW);
+ }
}
/**
* Register GFE Latency view. gfe_latency measures the latency between Google's network receives
* an RPC and reads back the first byte of the response.
+ *
+ * @deprecated The OpenCensus project is deprecated. Use OpenTelemetry to get gfe_latency and
+ * gfe_header_missing_count metrics.
*/
@VisibleForTesting
+ @ObsoleteApi(
+ "The OpenCensus project is deprecated. Use OpenTelemetry to get gfe_latency and gfe_header_missing_count metrics.")
public static void registerGfeLatencyView() {
- viewManager.registerView(SPANNER_GFE_LATENCY_VIEW);
+ if (SpannerOptions.isEnabledOpenCensusMetrics()) {
+ viewManager.registerView(SPANNER_GFE_LATENCY_VIEW);
+ }
}
/**
* Register GFE Header Missing Count view. gfe_header_missing_count is a counter of the number of
* RPC responses without a server-timing header.
+ *
+ * @deprecated The OpenCensus project is deprecated. Use OpenTelemetry to get gfe_latency and
+ * gfe_header_missing_count metrics.
*/
@VisibleForTesting
+ @ObsoleteApi(
+ "The OpenCensus project is deprecated. Use OpenTelemetry to get gfe_latency and gfe_header_missing_count metrics.")
public static void registerGfeHeaderMissingCountView() {
- viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW);
+ if (SpannerOptions.isEnabledOpenCensusMetrics()) {
+ viewManager.registerView(SPANNER_GFE_HEADER_MISSING_COUNT_VIEW);
+ }
}
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java
index 84f7fb0db9b..08d22dd2d67 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java
@@ -22,7 +22,8 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
-import io.opencensus.trace.Span;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -36,8 +37,11 @@ public class AsyncTransactionManagerImplTest {
@Test
public void testCommitReturnsCommitStats() {
+ Span oTspan = mock(Span.class);
+ ISpan span = new OpenTelemetrySpan(oTspan);
+ when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
try (AsyncTransactionManagerImpl manager =
- new AsyncTransactionManagerImpl(session, mock(Span.class), Options.commitStats())) {
+ new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats())))
.thenReturn(transaction);
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java
index 18ae8a07b35..b7c4834044a 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java
@@ -35,9 +35,11 @@
import com.google.protobuf.util.Timestamps;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.Transaction;
+import io.opentelemetry.api.OpenTelemetry;
import java.util.Collections;
import java.util.Map;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -62,6 +64,12 @@ public final class BatchClientImplTest {
private BatchClient client;
+ @BeforeClass
+ public static void setupOpenTelemetry() {
+ SpannerOptions.resetActiveTracingFramework();
+ SpannerOptions.enableOpenTelemetryTraces();
+ }
+
@SuppressWarnings("unchecked")
@Before
public void setUp() {
@@ -74,6 +82,7 @@ public void setUp() {
when(spannerOptions.getClock()).thenReturn(NanoClock.getDefaultClock());
when(spannerOptions.getSpannerRpcV1()).thenReturn(gapicRpc);
when(spannerOptions.getSessionLabels()).thenReturn(Collections.emptyMap());
+ when(spannerOptions.getOpenTelemetry()).thenReturn(OpenTelemetry.noop());
GrpcTransportOptions transportOptions = mock(GrpcTransportOptions.class);
when(transportOptions.getExecutorFactory()).thenReturn(mock(ExecutorFactory.class));
when(spannerOptions.getTransportOptions()).thenReturn(transportOptions);
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
index e527660cfa8..ccbf3c0b2b9 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
@@ -102,6 +102,8 @@
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.protobuf.lite.ProtoLiteUtils;
+import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -3375,7 +3377,10 @@ public void testReadWriteTransaction_usesOptions() {
when(pool.getSession()).thenReturn(session);
TransactionOption option = mock(TransactionOption.class);
- DatabaseClientImpl client = new DatabaseClientImpl(pool);
+ TraceWrapper traceWrapper =
+ new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""));
+
+ DatabaseClientImpl client = new DatabaseClientImpl(pool, traceWrapper);
client.readWriteTransaction(option);
verify(session).readWriteTransaction(option);
@@ -3388,7 +3393,7 @@ public void testTransactionManager_usesOptions() {
when(pool.getSession()).thenReturn(session);
TransactionOption option = mock(TransactionOption.class);
- DatabaseClientImpl client = new DatabaseClientImpl(pool);
+ DatabaseClientImpl client = new DatabaseClientImpl(pool, mock(TraceWrapper.class));
try (TransactionManager ignore = client.transactionManager(option)) {
verify(session).transactionManager(option);
}
@@ -3401,7 +3406,7 @@ public void testRunAsync_usesOptions() {
when(pool.getSession()).thenReturn(session);
TransactionOption option = mock(TransactionOption.class);
- DatabaseClientImpl client = new DatabaseClientImpl(pool);
+ DatabaseClientImpl client = new DatabaseClientImpl(pool, mock(TraceWrapper.class));
client.runAsync(option);
verify(session).runAsync(option);
@@ -3414,7 +3419,7 @@ public void testTransactionManagerAsync_usesOptions() {
when(pool.getSession()).thenReturn(session);
TransactionOption option = mock(TransactionOption.class);
- DatabaseClientImpl client = new DatabaseClientImpl(pool);
+ DatabaseClientImpl client = new DatabaseClientImpl(pool, mock(TraceWrapper.class));
try (AsyncTransactionManager ignore = client.transactionManagerAsync(option)) {
verify(session).transactionManagerAsync(option);
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FailOnOverkillTraceComponentImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FailOnOverkillTraceComponentImpl.java
index 4877c60837f..4d9b4ddd805 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FailOnOverkillTraceComponentImpl.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FailOnOverkillTraceComponentImpl.java
@@ -29,6 +29,7 @@
import io.opencensus.trace.SpanBuilder;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.SpanId;
+import io.opencensus.trace.Status;
import io.opencensus.trace.TraceComponent;
import io.opencensus.trace.TraceId;
import io.opencensus.trace.TraceOptions;
@@ -43,6 +44,7 @@
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.propagation.PropagationComponent;
import io.opencensus.trace.propagation.TextFormat;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -62,6 +64,8 @@ public class FailOnOverkillTraceComponentImpl extends TraceComponent {
private final TraceConfig traceConfig = new TestTraceConfig();
private static final Map spans = new LinkedHashMap<>();
+ private static final List annotations = new ArrayList<>();
+
public static class TestSpan extends Span {
@GuardedBy("this")
private volatile boolean ended = false;
@@ -75,14 +79,27 @@ private TestSpan(String spanName, SpanContext context, EnumSet options)
}
@Override
- public void addAnnotation(String description, Map attributes) {}
+ public void addAnnotation(String description, Map attributes) {
+ annotations.add(description);
+ }
+
+ @Override
+ public void addAnnotation(Annotation annotation) {
+ annotations.add(annotation.getDescription());
+ }
+
+ @Override
+ public void putAttributes(Map attributes) {}
@Override
- public void addAnnotation(Annotation annotation) {}
+ public void addAttributes(Map attributes) {}
@Override
public void addLink(Link link) {}
+ @Override
+ public void setStatus(Status status) {}
+
@Override
public void end(EndSpanOptions options) {
synchronized (this) {
@@ -210,10 +227,18 @@ Map getSpans() {
return spans;
}
+ List getAnnotations() {
+ return annotations;
+ }
+
void clearSpans() {
spans.clear();
}
+ void clearAnnotations() {
+ annotations.clear();
+ }
+
@Override
public PropagationComponent getPropagationComponent() {
return propagationComponent;
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java
index 36b9bee0af7..be9f6841f20 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java
@@ -20,6 +20,8 @@
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
+import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.OpenTelemetry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -93,7 +95,9 @@ public ScheduledExecutorService get() {
return new ScheduledThreadPoolExecutor(2);
}
},
- ((SpannerImpl) env.getTestHelper().getClient()).getSessionClient(db.getId()));
+ ((SpannerImpl) env.getTestHelper().getClient()).getSessionClient(db.getId()),
+ new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer("")),
+ OpenTelemetry.noop());
}
@Test
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java
index 0fa5ff27d82..62a25f9dc4d 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java
@@ -46,7 +46,7 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl {
@Override
DatabaseClientImpl createDatabaseClient(String clientId, SessionPool pool) {
- return new DatabaseClientWithClosedSessionImpl(clientId, pool);
+ return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
}
}
@@ -58,8 +58,8 @@ public static class DatabaseClientWithClosedSessionImpl extends DatabaseClientIm
private boolean invalidateNextSession = false;
private boolean allowReplacing = true;
- DatabaseClientWithClosedSessionImpl(String clientId, SessionPool pool) {
- super(clientId, pool);
+ DatabaseClientWithClosedSessionImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
+ super(clientId, pool, tracer);
}
/** Invalidate the next session that is checked out from the pool. */
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java
new file mode 100644
index 00000000000..956607e0231
--- /dev/null
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetrySpanTest.java
@@ -0,0 +1,563 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import com.google.api.gax.grpc.testing.LocalChannelProvider;
+import com.google.cloud.NoCredentials;
+import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ListValue;
+import com.google.spanner.v1.ResultSetMetadata;
+import com.google.spanner.v1.StructType;
+import com.google.spanner.v1.StructType.Field;
+import com.google.spanner.v1.TypeCode;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.data.EventData;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@Category(TracerTest.class)
+@RunWith(JUnit4.class)
+public class OpenTelemetrySpanTest {
+
+ private static final String TEST_PROJECT = "my-project";
+ private static final String TEST_INSTANCE = "my-instance";
+ private static final String TEST_DATABASE = "my-database";
+ private static LocalChannelProvider channelProvider;
+ private static MockSpannerServiceImpl mockSpanner;
+ private Spanner spanner;
+ private DatabaseClient client;
+ private static Server server;
+ private static InMemorySpanExporter spanExporter;
+
+ private static FailOnOverkillTraceComponentImpl failOnOverkillTraceComponent =
+ new FailOnOverkillTraceComponentImpl();
+
+ private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1");
+
+ private static final ResultSetMetadata SELECT1_METADATA =
+ ResultSetMetadata.newBuilder()
+ .setRowType(
+ StructType.newBuilder()
+ .addFields(
+ Field.newBuilder()
+ .setName("COL1")
+ .setType(
+ com.google.spanner.v1.Type.newBuilder()
+ .setCode(TypeCode.INT64)
+ .build())
+ .build())
+ .build())
+ .build();
+ private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET =
+ com.google.spanner.v1.ResultSet.newBuilder()
+ .addRows(
+ ListValue.newBuilder()
+ .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build())
+ .build())
+ .setMetadata(SELECT1_METADATA)
+ .build();
+ private static final Statement UPDATE_STATEMENT =
+ Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2");
+ private static final long UPDATE_COUNT = 1L;
+ private static final Statement INVALID_UPDATE_STATEMENT =
+ Statement.of("UPDATE NON_EXISTENT_TABLE SET BAR=1 WHERE BAZ=2");
+
+ private List expectedBatchCreateSessionsRequestEvents =
+ ImmutableList.of("Requesting 25 sessions", "Request for 25 sessions returned 25 sessions");
+
+ private int expectedBatchCreateSessionsRequestEventsCount = 2;
+
+ private List expectedBatchCreateSessionsEvents = ImmutableList.of("Creating 25 sessions");
+
+ private int expectedBatchCreateSessionsEventsCount = 1;
+
+ private List expectedExecuteStreamingQueryEvents =
+ ImmutableList.of("Starting/Resuming stream");
+
+ private int expectedExecuteStreamingQueryEventsCount = 1;
+
+ private List expectedReadOnlyTransactionSingleUseEvents =
+ ImmutableList.of(
+ "Acquiring session",
+ "No session available",
+ "Creating sessions",
+ "Waiting for a session to come available",
+ "Using Session");
+
+ private int expectedReadOnlyTransactionSingleUseEventsCount = 5;
+
+ private List expectedReadOnlyTransactionMultiUseEvents =
+ ImmutableList.of(
+ "Acquiring session",
+ "No session available",
+ "Creating sessions",
+ "Waiting for a session to come available",
+ "Using Session",
+ "Creating Transaction",
+ "Transaction Creation Done");
+
+ private int expectedReadOnlyTransactionMultiUseEventsCount = 7;
+
+ private List expectedReadWriteTransactionErrorEvents =
+ ImmutableList.of(
+ "Acquiring session",
+ "No session available",
+ "Creating sessions",
+ "Waiting for a session to come available",
+ "Using Session",
+ "Starting Transaction Attempt",
+ "Transaction Attempt Failed in user operation",
+ "exception");
+
+ private int expectedReadWriteTransactionErrorEventsCount = 8;
+ private List expectedReadWriteTransactionEvents =
+ ImmutableList.of(
+ "Acquiring session",
+ "No session available",
+ "Creating sessions",
+ "Waiting for a session to come available",
+ "Using Session",
+ "Starting Transaction Attempt",
+ "Starting Commit",
+ "Commit Done",
+ "Transaction Attempt Succeeded");
+
+ private int expectedReadWriteTransactionCount = 9;
+ private List expectedReadWriteTransactionErrorWithBeginTransactionEvents =
+ ImmutableList.of(
+ "Acquiring session",
+ "No session available",
+ "Creating sessions",
+ "Waiting for a session to come available",
+ "Using Session",
+ "Starting Transaction Attempt",
+ "Transaction Attempt Aborted in user operation. Retrying",
+ "Creating Transaction",
+ "Transaction Creation Done",
+ "Starting Commit",
+ "Commit Done",
+ "Transaction Attempt Succeeded");
+
+ private int expectedReadWriteTransactionErrorWithBeginTransactionEventsCount = 13;
+ private List expectedReadOnlyTransactionSpans =
+ ImmutableList.of(
+ "CloudSpannerOperation.BatchCreateSessionsRequest",
+ "CloudSpannerOperation.ExecuteStreamingQuery",
+ "CloudSpannerOperation.BatchCreateSessions",
+ "CloudSpanner.ReadOnlyTransaction",
+ "SessionPool.WaitForSession");
+
+ private List expectedReadWriteTransactionWithCommitSpans =
+ ImmutableList.of(
+ "CloudSpannerOperation.BatchCreateSessionsRequest",
+ "CloudSpannerOperation.Commit",
+ "CloudSpannerOperation.BatchCreateSessions",
+ "CloudSpanner.ReadWriteTransaction",
+ "SessionPool.WaitForSession");
+
+ private List expectedReadWriteTransactionSpans =
+ ImmutableList.of(
+ "CloudSpannerOperation.BatchCreateSessionsRequest",
+ "CloudSpannerOperation.BatchCreateSessions",
+ "CloudSpanner.ReadWriteTransaction",
+ "SessionPool.WaitForSession");
+
+ private List expectedReadWriteTransactionWithCommitAndBeginTransactionSpans =
+ ImmutableList.of(
+ "CloudSpannerOperation.BeginTransaction",
+ "CloudSpannerOperation.BatchCreateSessionsRequest",
+ "CloudSpannerOperation.Commit",
+ "CloudSpannerOperation.BatchCreateSessions",
+ "CloudSpanner.ReadWriteTransaction",
+ "SessionPool.WaitForSession");
+
+ @BeforeClass
+ public static void setupOpenTelemetry() {
+ SpannerOptions.resetActiveTracingFramework();
+ SpannerOptions.enableOpenTelemetryTraces();
+ }
+
+ @BeforeClass
+ public static void startStaticServer() throws Exception {
+ // Incorporating OpenCensus tracer to ensure that OpenTraces traces are utilized if enabled,
+ // regardless of the presence of OpenCensus tracer.
+ java.lang.reflect.Field field = Tracing.class.getDeclaredField("traceComponent");
+ field.setAccessible(true);
+ java.lang.reflect.Field modifiersField = null;
+ try {
+ modifiersField = java.lang.reflect.Field.class.getDeclaredField("modifiers");
+ } catch (NoSuchFieldException e) {
+ // Halt the test and ignore it.
+ Assume.assumeTrue(
+ "Skipping test as reflection is not allowed on reflection class in this JDK build",
+ false);
+ }
+ modifiersField.setAccessible(true);
+ // Remove the final modifier from the 'traceComponent' field.
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ field.set(null, failOnOverkillTraceComponent);
+
+ mockSpanner = new MockSpannerServiceImpl();
+ mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
+ mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET));
+ mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
+ mockSpanner.putStatementResult(
+ StatementResult.exception(
+ INVALID_UPDATE_STATEMENT,
+ Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException()));
+ String uniqueName = InProcessServerBuilder.generateName();
+ server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start();
+
+ channelProvider = LocalChannelProvider.create(uniqueName);
+ failOnOverkillTraceComponent.clearSpans();
+ failOnOverkillTraceComponent.clearAnnotations();
+ }
+
+ @AfterClass
+ public static void stopServer() throws InterruptedException {
+ if (server != null) {
+ server.shutdown();
+ server.awaitTermination();
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ spanExporter = InMemorySpanExporter.create();
+
+ SdkTracerProvider tracerProvider =
+ SdkTracerProvider.builder()
+ .addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
+ .build();
+
+ OpenTelemetry openTelemetry =
+ OpenTelemetrySdk.builder()
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .setTracerProvider(tracerProvider)
+ .build();
+
+ SpannerOptions.Builder builder =
+ SpannerOptions.newBuilder()
+ .setProjectId(TEST_PROJECT)
+ .setChannelProvider(channelProvider)
+ .setOpenTelemetry(openTelemetry)
+ .setCredentials(NoCredentials.getInstance())
+ .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build());
+
+ spanner = builder.build().getService();
+
+ client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ }
+
+ @After
+ public void tearDown() {
+ spanner.close();
+ mockSpanner.reset();
+ mockSpanner.removeAllExecutionTimes();
+ spanExporter.reset();
+ }
+
+ @Test
+ public void singleUse() {
+ try (ResultSet rs = client.singleUse().executeQuery(SELECT1)) {
+ while (rs.next()) {
+ // Just consume the result set.
+ }
+ }
+
+ // OpenCensus spans should be 0 as OpenTelemetry is enabled.
+ assertEquals(failOnOverkillTraceComponent.getSpans().size(), 0);
+
+ List actualSpanItems = new ArrayList<>();
+ spanExporter
+ .getFinishedSpanItems()
+ .forEach(
+ spanItem -> {
+ actualSpanItems.add(spanItem.getName());
+ switch (spanItem.getName()) {
+ case "CloudSpannerOperation.BatchCreateSessionsRequest":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsRequestEvents,
+ expectedBatchCreateSessionsRequestEventsCount);
+ break;
+ case "CloudSpannerOperation.BatchCreateSessions":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsEvents,
+ expectedBatchCreateSessionsEventsCount);
+ break;
+ case "CloudSpannerOperation.ExecuteStreamingQuery":
+ verifyRequestEvents(
+ spanItem,
+ expectedExecuteStreamingQueryEvents,
+ expectedExecuteStreamingQueryEventsCount);
+ break;
+ case "CloudSpanner.ReadOnlyTransaction":
+ verifyRequestEvents(
+ spanItem,
+ expectedReadOnlyTransactionSingleUseEvents,
+ expectedReadOnlyTransactionSingleUseEventsCount);
+ break;
+ case "SessionPool.WaitForSession":
+ assertEquals(0, spanItem.getEvents().size());
+ break;
+ default:
+ assert false;
+ }
+ });
+
+ verifySpans(actualSpanItems, expectedReadOnlyTransactionSpans);
+ }
+
+ @Test
+ public void multiUse() {
+ try (ReadOnlyTransaction tx = client.readOnlyTransaction()) {
+ try (ResultSet rs = tx.executeQuery(SELECT1)) {
+ while (rs.next()) {
+ // Just consume the result set.
+ }
+ }
+ }
+
+ List actualSpanItems = new ArrayList<>();
+ spanExporter
+ .getFinishedSpanItems()
+ .forEach(
+ spanItem -> {
+ actualSpanItems.add(spanItem.getName());
+ switch (spanItem.getName()) {
+ case "CloudSpannerOperation.BatchCreateSessionsRequest":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsRequestEvents,
+ expectedBatchCreateSessionsRequestEventsCount);
+ break;
+ case "CloudSpannerOperation.BatchCreateSessions":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsEvents,
+ expectedBatchCreateSessionsEventsCount);
+ break;
+ case "CloudSpannerOperation.ExecuteStreamingQuery":
+ verifyRequestEvents(
+ spanItem,
+ expectedExecuteStreamingQueryEvents,
+ expectedExecuteStreamingQueryEventsCount);
+ break;
+ case "SessionPool.WaitForSession":
+ assertEquals(0, spanItem.getEvents().size());
+ break;
+ case "CloudSpanner.ReadOnlyTransaction":
+ verifyRequestEvents(
+ spanItem,
+ expectedReadOnlyTransactionMultiUseEvents,
+ expectedReadOnlyTransactionMultiUseEventsCount);
+ break;
+ default:
+ assert false;
+ }
+ });
+
+ verifySpans(actualSpanItems, expectedReadOnlyTransactionSpans);
+ }
+
+ @Test
+ public void transactionRunner() {
+ TransactionRunner runner = client.readWriteTransaction();
+ runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT));
+
+ List actualSpanItems = new ArrayList<>();
+ spanExporter
+ .getFinishedSpanItems()
+ .forEach(
+ spanItem -> {
+ actualSpanItems.add(spanItem.getName());
+ switch (spanItem.getName()) {
+ case "CloudSpannerOperation.BatchCreateSessionsRequest":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsRequestEvents,
+ expectedBatchCreateSessionsRequestEventsCount);
+ break;
+ case "CloudSpannerOperation.BatchCreateSessions":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsEvents,
+ expectedBatchCreateSessionsEventsCount);
+ break;
+ case "SessionPool.WaitForSession":
+ case "CloudSpannerOperation.Commit":
+ assertEquals(0, spanItem.getEvents().size());
+ break;
+ case "CloudSpanner.ReadWriteTransaction":
+ verifyRequestEvents(
+ spanItem,
+ expectedReadWriteTransactionEvents,
+ expectedReadWriteTransactionCount);
+ break;
+ default:
+ assert false;
+ }
+ });
+
+ verifySpans(actualSpanItems, expectedReadWriteTransactionWithCommitSpans);
+ }
+
+ @Test
+ public void transactionRunnerWithError() {
+ TransactionRunner runner = client.readWriteTransaction();
+ SpannerException e =
+ assertThrows(
+ SpannerException.class,
+ () -> runner.run(transaction -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT)));
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+
+ List actualSpanItems = new ArrayList<>();
+ spanExporter
+ .getFinishedSpanItems()
+ .forEach(
+ spanItem -> {
+ actualSpanItems.add(spanItem.getName());
+ switch (spanItem.getName()) {
+ case "CloudSpannerOperation.BatchCreateSessionsRequest":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsRequestEvents,
+ expectedBatchCreateSessionsRequestEventsCount);
+ break;
+ case "CloudSpannerOperation.BatchCreateSessions":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsEvents,
+ expectedBatchCreateSessionsEventsCount);
+ break;
+ case "SessionPool.WaitForSession":
+ assertEquals(0, spanItem.getEvents().size());
+ break;
+ case "CloudSpanner.ReadWriteTransaction":
+ verifyRequestEvents(
+ spanItem,
+ expectedReadWriteTransactionErrorEvents,
+ expectedReadWriteTransactionErrorEventsCount);
+ break;
+ default:
+ assert false;
+ }
+ });
+
+ verifySpans(actualSpanItems, expectedReadWriteTransactionSpans);
+ }
+
+ @Test
+ public void transactionRunnerWithFailedAndBeginTransaction() {
+ Long updateCount =
+ client
+ .readWriteTransaction()
+ .run(
+ transaction -> {
+ // This update statement carries the BeginTransaction, but fails. This will
+ // cause the entire transaction to be retried with an explicit
+ // BeginTransaction RPC to ensure all statements in the transaction are
+ // actually executed against the same transaction.
+ SpannerException e =
+ assertThrows(
+ SpannerException.class,
+ () -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT));
+ assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
+ return transaction.executeUpdate(UPDATE_STATEMENT);
+ });
+
+ List actualSpanItems = new ArrayList<>();
+ spanExporter
+ .getFinishedSpanItems()
+ .forEach(
+ spanItem -> {
+ actualSpanItems.add(spanItem.getName());
+ switch (spanItem.getName()) {
+ case "CloudSpannerOperation.BatchCreateSessionsRequest":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsRequestEvents,
+ expectedBatchCreateSessionsRequestEventsCount);
+ break;
+ case "CloudSpannerOperation.BatchCreateSessions":
+ verifyRequestEvents(
+ spanItem,
+ expectedBatchCreateSessionsEvents,
+ expectedBatchCreateSessionsEventsCount);
+ break;
+ case "SessionPool.WaitForSession":
+ case "CloudSpannerOperation.Commit":
+ case "CloudSpannerOperation.BeginTransaction":
+ assertEquals(0, spanItem.getEvents().size());
+ break;
+ case "CloudSpanner.ReadWriteTransaction":
+ verifyRequestEvents(
+ spanItem,
+ expectedReadWriteTransactionErrorWithBeginTransactionEvents,
+ expectedReadWriteTransactionErrorWithBeginTransactionEventsCount);
+ break;
+ default:
+ assert false;
+ }
+ });
+
+ verifySpans(actualSpanItems, expectedReadWriteTransactionWithCommitAndBeginTransactionSpans);
+ }
+
+ private void verifyRequestEvents(SpanData spanItem, List expectedEvents, int eventCount) {
+ List eventNames =
+ spanItem.getEvents().stream().map(EventData::getName).collect(Collectors.toList());
+ assertEquals(eventCount, spanItem.getEvents().size());
+ assertEquals(
+ eventNames.stream().distinct().sorted().collect(Collectors.toList()),
+ expectedEvents.stream().sorted().collect(Collectors.toList()));
+ }
+
+ private static void verifySpans(List actualSpanItems, List expectedSpansItems) {
+ assertEquals(
+ actualSpanItems.stream().distinct().sorted().collect(Collectors.toList()),
+ expectedSpansItems.stream().sorted().collect(Collectors.toList()));
+ }
+}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
index 06c1725d76a..217e818d42c 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
@@ -21,6 +21,7 @@
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.google.api.client.util.BackOff;
import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator;
@@ -36,8 +37,10 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
-import io.opencensus.trace.EndSpanOptions;
import io.opencensus.trace.Span;
+import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -132,15 +135,19 @@ public boolean isWithBeginTransaction() {
@Before
public void setUp() {
+ SpannerOptions.resetActiveTracingFramework();
+ SpannerOptions.enableOpenTelemetryTraces();
initWithLimit(Integer.MAX_VALUE);
}
private void initWithLimit(int maxBufferSize) {
+
resumableStreamIterator =
new AbstractResultSet.ResumableStreamIterator(
maxBufferSize,
"",
- null,
+ new OpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)),
+ new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer("")),
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(),
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) {
@Override
@@ -163,12 +170,39 @@ public void simple() {
}
@Test
- public void closedSpan() {
+ public void closedOTSpan() {
+ SpannerOptions.resetActiveTracingFramework();
+ SpannerOptions.enableOpenTelemetryTraces();
Assume.assumeTrue(
"This test is only supported on JDK11 and lower",
JavaVersionUtil.getJavaMajorVersion() < 12);
- Span span = mock(Span.class);
+ io.opentelemetry.api.trace.Span oTspan = mock(io.opentelemetry.api.trace.Span.class);
+ ISpan span = new OpenTelemetrySpan(oTspan);
+ when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
+ setInternalState(ResumableStreamIterator.class, this.resumableStreamIterator, "span", span);
+
+ ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
+ Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
+ Mockito.when(s1.next())
+ .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
+ .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
+ .thenReturn(null);
+ assertThat(consume(resumableStreamIterator)).containsExactly("a", "b").inOrder();
+
+ resumableStreamIterator.close("closed");
+ verify(oTspan).end();
+ }
+
+ @Test
+ public void closedOCSpan() {
+ SpannerOptions.resetActiveTracingFramework();
+ SpannerOptions.enableOpenCensusTraces();
+ Assume.assumeTrue(
+ "This test is only supported on JDK11 and lower",
+ JavaVersionUtil.getJavaMajorVersion() < 12);
+ Span mockSpan = mock(Span.class);
+ ISpan span = new OpenCensusSpan(mockSpan);
setInternalState(ResumableStreamIterator.class, this.resumableStreamIterator, "span", span);
ResultSetStream s1 = Mockito.mock(ResultSetStream.class);
@@ -180,7 +214,7 @@ public void closedSpan() {
assertThat(consume(resumableStreamIterator)).containsExactly("a", "b").inOrder();
resumableStreamIterator.close("closed");
- verify(span).end(EndSpanOptions.builder().setSampleToLocalSpanStore(true).build());
+ verify(mockSpan).end(OpenCensusSpan.END_SPAN_OPTIONS);
}
@Test
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java
index 8af1323238a..c8d9bc39339 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java
@@ -17,6 +17,8 @@
package com.google.cloud.spanner;
import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -27,6 +29,8 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
+import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.OpenTelemetry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -83,6 +87,9 @@ public static Collection