Skip to content

Commit

Permalink
instrumenting cassandra executeReactive method
Browse files Browse the repository at this point in the history
  • Loading branch information
SimoneGiusso committed Aug 8, 2022
1 parent f46ca5d commit cf9763c
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ muzzle {
}

dependencies {
library("com.datastax.oss:java-driver-core:4.0.0")
library("com.datastax.oss:java-driver-core:4.14.1")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

testImplementation("io.projectreactor:reactor-core:3.4.21")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void onEnd(
SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT,
executionInfo.getSpeculativeExecutionCount());

Statement<?> statement = executionInfo.getStatement();
Statement<?> statement = (Statement<?>) executionInfo.getRequest();
String consistencyLevel;
DriverExecutionProfile config =
request.getSession().getContext().getConfig().getDefaultProfile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraSingletons.instrumenter;

import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.dse.driver.internal.core.cql.reactive.DefaultReactiveResultSet;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverException;
Expand Down Expand Up @@ -186,6 +188,15 @@ public ResultSet execute(Statement<?> statement) {
return resultSet;
}

@Override
public ReactiveResultSet executeReactive(Statement<?> statement) {
String query = getQuery(statement);
CassandraRequest request = CassandraRequest.create(session, query);
return new DefaultReactiveResultSet(() ->
executeAsync(request, () -> session.executeAsync(query))
);
}

@Override
public CompletionStage<AsyncResultSet> executeAsync(Statement<?> statement) {
String query = getQuery(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.core.publisher.Flux
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
Expand Down Expand Up @@ -110,6 +111,45 @@ class CassandraClientTest extends AgentInstrumentationSpecification {
"async_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT async_test.users" | "SELECT" | "users"
}

def "test reactive"() {
setup:
CqlSession session = getSession(keyspace)

runWithSpan("parent") {
Flux.from(session.executeReactive(statement)).doOnComplete({ result ->
runWithSpan("child") {}
}).blockLast()
}

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
cassandraSpan(it, 1, spanName, expectedStatement, operation, keyspace, table, span(0))
span(2) {
name "child"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}

cleanup:
session.close()

where:
keyspace | statement | expectedStatement | spanName | operation | table
null | "DROP KEYSPACE IF EXISTS reactive_test" | "DROP KEYSPACE IF EXISTS reactive_test" | "DB Query" | null | null
null | "CREATE KEYSPACE reactive_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE reactive_test WITH REPLICATION = {?:?, ?:?}" | "DB Query" | null | null
"reactive_test" | "CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )" | "reactive_test" | null | null
"reactive_test" | "INSERT INTO reactive_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO reactive_test.users (id, name) values (uuid(), ?)" | "INSERT reactive_test.users" | "INSERT" | "reactive_test.users"
"reactive_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT reactive_test.users" | "SELECT" | "users"
}

def cassandraSpan(TraceAssert trace, int index, String spanName, String statement, String operation, String keyspace, String table, Object parentSpan = null) {
trace.span(index) {
name spanName
Expand Down

0 comments on commit cf9763c

Please sign in to comment.