Skip to content

Commit

Permalink
HBASE-26473 Introduce db.hbase.container_operations span attribute
Browse files Browse the repository at this point in the history
For batch operations, collect and annotate the associated span with the set of all operations
contained in the batch.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
ndimiduk committed Jan 26, 2022
1 parent 7be9f26 commit ef97eec
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
Expand All @@ -363,7 +364,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -379,7 +381,8 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
preCheck();
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand Down Expand Up @@ -422,7 +425,8 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
Expand All @@ -437,7 +441,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -452,7 +457,8 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand All @@ -474,7 +480,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)
@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutate);
.setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate.getAction());
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
Expand Down Expand Up @@ -527,7 +534,8 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutates);
.setOperation(checkAndMutates)
.setContainerOperations(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
Expand Down Expand Up @@ -583,7 +591,8 @@ public CompletableFuture<Result> mutateRow(RowMutations mutations) {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(mutations);
.setOperation(mutations)
.setContainerOperations(mutations);
return tracedFuture(
() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
Expand Down Expand Up @@ -656,28 +665,32 @@ public void onComplete() {
@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(gets);
.setOperation(gets)
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}

@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(puts);
.setOperation(puts)
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
return tracedFutures(() -> voidMutate(puts), supplier);
}

@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(deletes);
.setOperation(deletes)
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
return tracedFutures(() -> voidMutate(deletes), supplier);
}

@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(actions);
.setOperation(actions)
.setContainerOperations(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@

package org.apache.hadoop.hbase.client.trace;

import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
Expand Down Expand Up @@ -85,6 +92,72 @@ public TableOperationSpanBuilder setOperation(final Operation operation) {
return this;
}

// `setContainerOperations` perform a recursive descent expansion of all the operations
// contained within the provided "batch" object.

public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) {
final Operation[] ops = mutations.getMutations()
.stream()
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

public TableOperationSpanBuilder setContainerOperations(final Row row) {
final Operation[] ops =
Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

public TableOperationSpanBuilder setContainerOperations(
final Collection<? extends Row> operations
) {
final Operation[] ops = operations.stream()
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

private static Set<Operation> unpackRowOperations(final Row row) {
final Set<Operation> ops = new HashSet<>();
if (row instanceof CheckAndMutate) {
final CheckAndMutate cam = (CheckAndMutate) row;
ops.addAll(unpackRowOperations(cam));
}
if (row instanceof RowMutations) {
final RowMutations mutations = (RowMutations) row;
ops.addAll(unpackRowOperations(mutations));
}
return ops;
}

private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) {
final Set<Operation> ops = new HashSet<>();
final Operation op = valueFrom(cam.getAction());
switch (op) {
case BATCH:
case CHECK_AND_MUTATE:
ops.addAll(unpackRowOperations(cam.getAction()));
break;
default:
ops.add(op);
}
return ops;
}

public TableOperationSpanBuilder setContainerOperations(
final Operation... operations
) {
final List<String> ops = Arrays.stream(operations)
.map(op -> op == null ? unknown : op.name())
.sorted()
.distinct()
.collect(Collectors.toList());
attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops);
return this;
}

public TableOperationSpanBuilder setTableName(final TableName tableName) {
this.tableName = tableName;
TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
Expand Down Expand Up @@ -333,15 +335,19 @@ public void testCheckAndMutateList() {
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf(
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
}

@Test
public void testCheckAndMutateAll() {
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf(
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
}

private void testCheckAndMutateBuilder(Row op) {
Expand Down Expand Up @@ -427,8 +433,13 @@ public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOExceptio

@Test
public void testMutateRow() throws IOException {
table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0))));
assertTrace("BATCH");
final RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add(new Delete(Bytes.toBytes(0)));
table.mutateRow(mutations).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
}

@Test
Expand All @@ -443,27 +454,31 @@ public void testExistsList() {
.allOf(
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testExistsAll() {
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testGetList() {
CompletableFuture
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testGetAll() {
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
Expand All @@ -472,14 +487,16 @@ public void testPutList() {
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
}

@Test
public void testPutAll() {
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
}

@Test
Expand All @@ -488,13 +505,15 @@ public void testDeleteList() {
.allOf(
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
public void testDeleteAll() {
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
Expand All @@ -503,12 +522,14 @@ public void testBatch() {
.allOf(
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
public void testBatchAll() {
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public final class HBaseSemanticAttributes {
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
public static final AttributeKey<String> TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
/**
* For operations that themselves ship one or more operations, such as
* {@link Operation#BATCH} and {@link Operation#CHECK_AND_MUTATE}.
*/
public static final AttributeKey<List<String>> CONTAINER_DB_OPERATIONS_KEY =
AttributeKey.stringArrayKey("db.hbase.container_operations");
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
AttributeKey.stringArrayKey("db.hbase.regions");
public static final AttributeKey<String> RPC_SERVICE_KEY =
Expand Down

0 comments on commit ef97eec

Please sign in to comment.