Skip to content

Commit

Permalink
feat: support for transactional operations
Browse files Browse the repository at this point in the history
- tested using newTransaction() and runInTransaction()
  • Loading branch information
jimit-j-shah committed May 30, 2024
1 parent aa7a1dc commit 10341c0
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;

final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datastore {

Expand Down Expand Up @@ -103,13 +105,18 @@ static class ReadWriteTransactionCallable<T> implements Callable<T> {
private final TransactionCallable<T> callable;
private volatile TransactionOptions options;
private volatile Transaction transaction;
@Nonnull private final Context transactionTraceContext;

ReadWriteTransactionCallable(
Datastore datastore, TransactionCallable<T> callable, TransactionOptions options) {
Datastore datastore,
TransactionCallable<T> callable,
TransactionOptions options,
@Nonnull Context parentTraceContext) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
this.transaction = null;
this.transactionTraceContext = parentTraceContext;
}

Datastore getDatastore() {
Expand All @@ -132,8 +139,9 @@ void setPrevTransactionId(ByteString transactionId) {

@Override
public T call() throws DatastoreException {
transaction = datastore.newTransaction(options);
try {
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored =
transactionTraceContext.makeCurrent()) {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
return value;
Expand All @@ -154,36 +162,41 @@ public T call() throws DatastoreException {

@Override
public <T> T runInTransaction(final TransactionCallable<T> callable) {
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(this, callable, null),
new ReadWriteTransactionCallable<T>(this, callable, null, otelTraceUtil.currentContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}

@Override
public <T> T runInTransaction(
final TransactionCallable<T> callable, TransactionOptions transactionOptions) {
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(this, callable, transactionOptions),
new ReadWriteTransactionCallable<T>(
this, callable, transactionOptions, otelTraceUtil.currentContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.setStatus(Status.UNKNOWN.withDescription(e.getMessage()));
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
}
}

Expand Down Expand Up @@ -634,10 +647,14 @@ private com.google.datastore.v1.CommitResponse commitMutation(

com.google.datastore.v1.CommitResponse commit(
final com.google.datastore.v1.CommitRequest requestPb) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT);
final boolean isTransactional =
requestPb.hasTransaction() || requestPb.hasSingleUseTransaction();
final String spanName =
isTransactional
? com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT
: com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT;
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName);
span.setAttribute("isTransactional", requestPb.hasTransaction());

try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.commit(requestPb),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.api.core.BetaApi;
import com.google.cloud.datastore.models.ExplainOptions;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.cloud.datastore.telemetry.TraceUtil.Scope;
import com.google.common.collect.ImmutableList;
import com.google.datastore.v1.ReadOptions;
import com.google.datastore.v1.TransactionOptions;
Expand All @@ -28,6 +30,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;

final class TransactionImpl extends BaseDatastoreBatchWriter implements Transaction {

Expand All @@ -37,6 +40,8 @@ final class TransactionImpl extends BaseDatastoreBatchWriter implements Transact

private final ReadOptionProtoPreparer readOptionProtoPreparer;

@Nonnull private final Context transactionTraceContext;

static class ResponseImpl implements Transaction.Response {

private final com.google.datastore.v1.CommitResponse response;
Expand Down Expand Up @@ -78,6 +83,7 @@ public List<Key> getGeneratedKeys() {

transactionId = datastore.requestTransactionId(requestPb);
this.readOptionProtoPreparer = new ReadOptionProtoPreparer();
this.transactionTraceContext = datastore.getOptions().getTraceUtil().currentContext();
}

@Override
Expand All @@ -96,7 +102,9 @@ public Iterator<Entity> get(Key... keys) {
@Override
public List<Entity> fetch(Key... keys) {
validateActive();
return DatastoreHelper.fetch(this, keys);
try (Scope ignored = transactionTraceContext.makeCurrent()) {
return DatastoreHelper.fetch(this, keys);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.datastore.DatastoreOptions;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.context.Context;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -35,6 +36,7 @@ public interface TraceUtil {
static final String SPAN_NAME_COMMIT = "Commit";
static final String SPAN_NAME_RUN_QUERY = "RunQuery";
static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery";
static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.run";
static final String SPAN_NAME_BEGIN_TRANSACTION = "Transaction.Begin";
static final String SPAN_NAME_TRANSACTION_LOOKUP = "Transaction.Lookup";
static final String SPAN_NAME_TRANSACTION_COMMIT = "Transaction.Commit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_LOOKUP;
import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN;
import static com.google.common.truth.Truth.assertThat;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -749,6 +751,7 @@ public void transactionalLookupTest() throws Exception {
try (Scope ignored = rootSpan.makeCurrent()) {
Transaction transaction = datastore.newTransaction();
Entity entity = datastore.get(KEY1, ReadOption.transactionId(transaction.getTransactionId()));
transaction.commit();
assertNull(entity);
} finally {
rootSpan.end();
Expand All @@ -757,18 +760,55 @@ public void transactionalLookupTest() throws Exception {

fetchAndValidateTrace(
customSpanContext.getTraceId(),
/*numExpectedSpans=*/ 2,
Collections.singletonList(Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION)));
/*numExpectedSpans=*/ 3,
Arrays.asList(
Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION),
Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP),
Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT)));
}

@Test
public void runInTransactionQueryTest() throws Exception {
Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build();
Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build();
List<Entity> entityList = new ArrayList<>();
entityList.add(entity1);
entityList.add(entity2);

List<Entity> response = datastore.add(entity1, entity2);
assertEquals(entityList, response);

assertNotNull(customSpanContext);

Span rootSpan = getNewRootSpanWithContext();
try (Scope ignored = rootSpan.makeCurrent()) {
PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field"));
Query<Entity> query =
Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build();
Datastore.TransactionCallable<Boolean> callable =
transaction -> {
QueryResults<Entity> queryResults = datastore.run(query);
assertTrue(queryResults.hasNext());
assertEquals(entity1, queryResults.next());
assertFalse(queryResults.hasNext());
return true;
};
datastore.runInTransaction(callable);
} finally {
rootSpan.end();
}
waitForTracesToComplete();

fetchAndValidateTrace(
customSpanContext.getTraceId(),
/*numExpectedSpans=*/ 2,
Collections.singletonList(Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP)));
/*numExpectedSpans=*/ 4,
Arrays.asList(
Collections.singletonList(SPAN_NAME_TRANSACTION_RUN),
Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION),
Collections.singletonList(SPAN_NAME_RUN_QUERY),
Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT)));
}

@Test
public void runInTransactionQueryTest() throws Exception {}

@Test
public void runInTransactionAggregationQueryTest() throws Exception {}

Expand Down

0 comments on commit 10341c0

Please sign in to comment.