Skip to content

Commit

Permalink
new cassandra 4.14 module to support reactive tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
SimoneGiusso committed Aug 21, 2022
1 parent f46ca5d commit 634755c
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ muzzle {
}

dependencies {
implementation(project(":instrumentation:cassandra:cassandra-4.0:library"))

library("com.datastax.oss:java-driver-core:4.0.0")

compileOnly("com.google.auto.value:auto-value-annotations")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.CqlSession;
import io.opentelemetry.instrumentation.cassandra.v4_0.TracingCqlSession;
import java.util.function.Function;

public class CompletionStageFunction implements Function<Object, Object> {
Expand Down
25 changes: 25 additions & 0 deletions instrumentation/cassandra/cassandra-4.0/library/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("com.datastax.oss")
module.set("java-driver-core")
versions.set("[4.0,)")
assertInverse.set(true)
}
}

dependencies {
library("com.datastax.oss:java-driver-core:4.0.0")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.session.Session;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import io.opentelemetry.api.GlobalOpenTelemetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesGetter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import static io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraSingletons.instrumenter;
import static io.opentelemetry.instrumentation.cassandra.v4_0.CassandraSingletons.instrumenter;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("com.datastax.oss")
module.set("java-driver-core")
versions.set("[4.14,)")
assertInverse.set(true)
}
}

dependencies {
implementation(project(":instrumentation:cassandra:cassandra-4.0:library"))

library("com.datastax.oss:java-driver-core:4.14.1")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

testImplementation("io.projectreactor:reactor-core:3.4.21")
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class CassandraClientInstrumentationModule extends InstrumentationModule {
public CassandraClientInstrumentationModule() {
super("cassandra", "cassandra-4.14");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new SessionBuilderInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import com.datastax.oss.driver.api.core.CqlSession;
import java.util.function.Function;

public class CompletionStageFunction implements Function<Object, Object> {

@Override
public Object apply(Object session) {
if (session == null) {
return null;
}
// This should cover ours and OT's TracingCqlSession
if (session.getClass().getName().endsWith("cassandra4.TracingCqlSession")) {
return session;
}
return new TracingCqlSession((CqlSession) session);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletionStage;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class SessionBuilderInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// Note: Cassandra has a large driver and we instrument single class in it.
// The rest is ignored in AdditionalLibraryIgnoresMatcher
return named("com.datastax.oss.driver.api.core.session.SessionBuilder");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("buildAsync")).and(takesArguments(0)),
SessionBuilderInstrumentation.class.getName() + "$BuildAdvice");
}

@SuppressWarnings("unused")
public static class BuildAdvice {

/**
* Strategy: each time we build a connection to a Cassandra cluster, the
* com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync() method is called. The
* opentracing contribution is a simple wrapper, so we just have to wrap the new session.
*
* @param stage The fresh CompletionStage to patch. This stage produces session which is
* replaced with new session
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void injectTracingSession(
@Advice.Return(readOnly = false) CompletionStage<?> stage) {
stage = stage.thenApply(new CompletionStageFunction());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.dse.driver.internal.core.cql.reactive.DefaultReactiveResultSet;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.Statement;

public class TracingCqlSession
extends io.opentelemetry.instrumentation.cassandra.v4_0.TracingCqlSession {

public TracingCqlSession(CqlSession session) {
super(session);
}

@Override
public ReactiveResultSet executeReactive(Statement<?> statement) {
return new DefaultReactiveResultSet(() -> executeAsync(statement));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.config.DefaultDriverOption
import com.datastax.oss.driver.api.core.config.DriverConfigLoader
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.core.publisher.Flux
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import spock.lang.Shared

import java.time.Duration

import static io.opentelemetry.api.trace.SpanKind.CLIENT

class CassandraClientTest extends AgentInstrumentationSpecification {
private static final Logger logger = LoggerFactory.getLogger(CassandraClientTest)

@Shared
GenericContainer cassandra
@Shared
int cassandraPort

def setupSpec() {
cassandra = new GenericContainer("cassandra:4.0")
// limit memory usage
.withEnv("JVM_OPTS", "-Xmx128m -Xms128m")
.withExposedPorts(9042)
.withLogConsumer(new Slf4jLogConsumer(logger))
.withStartupTimeout(Duration.ofSeconds(120))
cassandra.start()

cassandraPort = cassandra.getMappedPort(9042)
}

def cleanupSpec() {
cassandra.stop()
}

def "test reactive"() {
setup:
CqlSession session = getSession(keyspace)

runWithSpan("parent") {
Flux.from(session.executeReactive(statement)).doOnComplete({ result ->
runWithSpan("child") {}
}).blockLast()
}

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
cassandraSpan(it, 1, spanName, expectedStatement, operation, keyspace, table, span(0))
span(2) {
name "child"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}

cleanup:
session.close()

where:
keyspace | statement | expectedStatement | spanName | operation | table
null | "DROP KEYSPACE IF EXISTS reactive_test" | "DROP KEYSPACE IF EXISTS reactive_test" | "DB Query" | null | null
null | "CREATE KEYSPACE reactive_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE reactive_test WITH REPLICATION = {?:?, ?:?}" | "DB Query" | null | null
"reactive_test" | "CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )" | "reactive_test" | null | null
"reactive_test" | "INSERT INTO reactive_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO reactive_test.users (id, name) values (uuid(), ?)" | "INSERT reactive_test.users" | "INSERT" | "reactive_test.users"
"reactive_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT reactive_test.users" | "SELECT" | "users"
}

def cassandraSpan(TraceAssert trace, int index, String spanName, String statement, String operation, String keyspace, String table, Object parentSpan = null) {
trace.span(index) {
name spanName
kind CLIENT
if (parentSpan == null) {
hasNoParent()
} else {
childOf((SpanData) parentSpan)
}
attributes {
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_IP" "127.0.0.1"
"$SemanticAttributes.NET_PEER_PORT" cassandraPort
"$SemanticAttributes.DB_SYSTEM" "cassandra"
"$SemanticAttributes.DB_NAME" keyspace
"$SemanticAttributes.DB_STATEMENT" statement
"$SemanticAttributes.DB_OPERATION" operation
"$SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL" "LOCAL_ONE"
"$SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC" "datacenter1"
"$SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID" String
"$SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE" Boolean
"$SemanticAttributes.DB_CASSANDRA_PAGE_SIZE" 5000
"$SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT" 0
// the SqlStatementSanitizer can't handle CREATE statements yet
"$SemanticAttributes.DB_CASSANDRA_TABLE" table
}
}
}

def getSession(String keyspace) {
DriverConfigLoader configLoader = DefaultDriverConfigLoader.builder()
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(0))
.build()
return CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace((String) keyspace)
.build()
}
}
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ include(":instrumentation:azure-core:azure-core-1.19:javaagent")
include(":instrumentation:azure-core:azure-core-1.19:library-instrumentation-shaded")
include(":instrumentation:cassandra:cassandra-3.0:javaagent")
include(":instrumentation:cassandra:cassandra-4.0:javaagent")
include(":instrumentation:cassandra:cassandra-4.0:library")
include(":instrumentation:cassandra:cassandra-4.14:javaagent")
include(":instrumentation:cdi-testing")
include(":instrumentation:graphql-java-12.0:javaagent")
include(":instrumentation:graphql-java-12.0:library")
Expand Down

0 comments on commit 634755c

Please sign in to comment.