diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index 66bc6c0ba..753dab596 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -25,6 +25,7 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.datastore.execution.AggregationQueryExecutor; import com.google.cloud.datastore.spi.v1.DatastoreRpc; +import com.google.cloud.datastore.telemetry.TraceUtil.Context; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.AbstractIterator; @@ -52,6 +53,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import javax.annotation.Nonnull; final class DatastoreImpl extends BaseService implements Datastore { @@ -103,13 +105,18 @@ static class ReadWriteTransactionCallable implements Callable { private final TransactionCallable callable; private volatile TransactionOptions options; private volatile Transaction transaction; + @Nonnull private final Context transactionTraceContext; ReadWriteTransactionCallable( - Datastore datastore, TransactionCallable callable, TransactionOptions options) { + Datastore datastore, + TransactionCallable callable, + TransactionOptions options, + @Nonnull Context parentTraceContext) { this.datastore = datastore; this.callable = callable; this.options = options; this.transaction = null; + this.transactionTraceContext = parentTraceContext; } Datastore getDatastore() { @@ -132,8 +139,9 @@ void setPrevTransactionId(ByteString transactionId) { @Override public T call() throws DatastoreException { - transaction = datastore.newTransaction(options); - try { + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = + transactionTraceContext.makeCurrent()) { + transaction = datastore.newTransaction(options); T value = callable.run(transaction); transaction.commit(); return value; @@ -154,36 +162,41 @@ public T call() throws DatastoreException { @Override public T runInTransaction(final TransactionCallable callable) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + otelTraceUtil.startSpan( + com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new ReadWriteTransactionCallable(this, callable, null), + new ReadWriteTransactionCallable(this, callable, null, otelTraceUtil.currentContext()), retrySettings, TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @Override public T runInTransaction( final TransactionCallable callable, TransactionOptions transactionOptions) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + otelTraceUtil.startSpan( + com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new ReadWriteTransactionCallable(this, callable, transactionOptions), + new ReadWriteTransactionCallable( + this, callable, transactionOptions, otelTraceUtil.currentContext()), retrySettings, TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -634,10 +647,14 @@ private com.google.datastore.v1.CommitResponse commitMutation( com.google.datastore.v1.CommitResponse commit( final com.google.datastore.v1.CommitRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT); + final boolean isTransactional = + requestPb.hasTransaction() || requestPb.hasSingleUseTransaction(); + final String spanName = + isTransactional + ? com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT + : com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT; + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); span.setAttribute("isTransactional", requestPb.hasTransaction()); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( () -> datastoreRpc.commit(requestPb), diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java index f08a908ec..b87d175a0 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java @@ -20,6 +20,8 @@ import com.google.api.core.BetaApi; import com.google.cloud.datastore.models.ExplainOptions; +import com.google.cloud.datastore.telemetry.TraceUtil.Context; +import com.google.cloud.datastore.telemetry.TraceUtil.Scope; import com.google.common.collect.ImmutableList; import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.TransactionOptions; @@ -28,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import javax.annotation.Nonnull; final class TransactionImpl extends BaseDatastoreBatchWriter implements Transaction { @@ -37,6 +40,8 @@ final class TransactionImpl extends BaseDatastoreBatchWriter implements Transact private final ReadOptionProtoPreparer readOptionProtoPreparer; + @Nonnull private final Context transactionTraceContext; + static class ResponseImpl implements Transaction.Response { private final com.google.datastore.v1.CommitResponse response; @@ -78,6 +83,7 @@ public List getGeneratedKeys() { transactionId = datastore.requestTransactionId(requestPb); this.readOptionProtoPreparer = new ReadOptionProtoPreparer(); + this.transactionTraceContext = datastore.getOptions().getTraceUtil().currentContext(); } @Override @@ -96,7 +102,9 @@ public Iterator get(Key... keys) { @Override public List fetch(Key... keys) { validateActive(); - return DatastoreHelper.fetch(this, keys); + try (Scope ignored = transactionTraceContext.makeCurrent()) { + return DatastoreHelper.fetch(this, keys); + } } @Override diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java index d43c36a07..fe0e92161 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java @@ -21,6 +21,7 @@ import com.google.api.core.InternalExtensionOnly; import com.google.cloud.datastore.DatastoreOptions; import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.context.Context; import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -35,6 +36,7 @@ public interface TraceUtil { static final String SPAN_NAME_COMMIT = "Commit"; static final String SPAN_NAME_RUN_QUERY = "RunQuery"; static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery"; + static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.run"; static final String SPAN_NAME_BEGIN_TRANSACTION = "Transaction.Begin"; static final String SPAN_NAME_TRANSACTION_LOOKUP = "Transaction.Lookup"; static final String SPAN_NAME_TRANSACTION_COMMIT = "Transaction.Commit"; diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java index 67df8c44b..d2cffc5db 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java @@ -22,7 +22,9 @@ import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_LOOKUP; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN; import static com.google.common.truth.Truth.assertThat; import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; import static org.junit.Assert.assertEquals; @@ -749,6 +751,7 @@ public void transactionalLookupTest() throws Exception { try (Scope ignored = rootSpan.makeCurrent()) { Transaction transaction = datastore.newTransaction(); Entity entity = datastore.get(KEY1, ReadOption.transactionId(transaction.getTransactionId())); + transaction.commit(); assertNull(entity); } finally { rootSpan.end(); @@ -757,18 +760,55 @@ public void transactionalLookupTest() throws Exception { fetchAndValidateTrace( customSpanContext.getTraceId(), - /*numExpectedSpans=*/ 2, - Collections.singletonList(Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION))); + /*numExpectedSpans=*/ 3, + Arrays.asList( + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP), + Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); + } + + @Test + public void runInTransactionQueryTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + assertNotNull(customSpanContext); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + Datastore.TransactionCallable callable = + transaction -> { + QueryResults queryResults = datastore.run(query); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + return true; + }; + datastore.runInTransaction(callable); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); fetchAndValidateTrace( customSpanContext.getTraceId(), - /*numExpectedSpans=*/ 2, - Collections.singletonList(Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP))); + /*numExpectedSpans=*/ 4, + Arrays.asList( + Collections.singletonList(SPAN_NAME_TRANSACTION_RUN), + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_RUN_QUERY), + Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); } - @Test - public void runInTransactionQueryTest() throws Exception {} - @Test public void runInTransactionAggregationQueryTest() throws Exception {}