Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument cassandra executeReactive method #6441

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
634755c
new cassandra 4.14 module to support reactive tracing
SimoneGiusso Aug 21, 2022
f59e4d7
fix DuplicateFileCopyingException
SimoneGiusso Aug 21, 2022
128f0f6
Move CassandraSingletons in instrumentation libraries and add new Cas…
SimoneGiusso Aug 27, 2022
25de63f
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 15, 2022
0ca0549
sync with main branch
SimoneGiusso Nov 15, 2022
8841346
WIP: Fix tests
SimoneGiusso Nov 16, 2022
5f03349
WIP: use lowest java-driver-core version that supports reactive
SimoneGiusso Nov 17, 2022
15172a4
WIP: create common test package + wrap original TracingCqlSession
SimoneGiusso Nov 24, 2022
eec8951
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 24, 2022
15e4b42
WIP: avoid having both cassandra 4.* modules applied
SimoneGiusso Nov 26, 2022
ad461f6
Address last comments from mateuszrzeszutek's review
SimoneGiusso Nov 27, 2022
3e8ea44
Share code keeping it package protected + clean-up
SimoneGiusso Nov 27, 2022
74fd803
Make CompletionStageFunction public and rename package for classes un…
SimoneGiusso Nov 27, 2022
308a11e
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 29, 2022
dc416a8
Merge remote-tracking branch 'upstream/main' into instrumenting-execu…
trask Feb 27, 2023
aced6ee
Move library to 4.4
trask Feb 27, 2023
ed61c54
more
trask Feb 27, 2023
865b337
more
trask Feb 27, 2023
a924133
todo
trask Feb 27, 2023
19225cd
Update docs
trask Feb 27, 2023
2385797
Rewrite cassandra reactive test in java
SimoneGiusso Feb 27, 2023
8593fe4
Remove deprecated ExecutionInfo#getStatement method
SimoneGiusso Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ muzzle {
}

dependencies {
implementation(project(":instrumentation:cassandra:cassandra-4.0:library"))

library("com.datastax.oss:java-driver-core:4.0.0")

compileOnly("com.google.auto.value:auto-value-annotations")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.CqlSession;
import io.opentelemetry.instrumentation.cassandra.v4_0.TracingCqlSession;
import java.util.function.Function;

public class CompletionStageFunction implements Function<Object, Object> {
Expand Down
16 changes: 16 additions & 0 deletions instrumentation/cassandra/cassandra-4.0/library/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
plugins {
id("otel.library-instrumentation")
}

dependencies {
library("com.datastax.oss:java-driver-core:4.0.0")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.session.Session;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import io.opentelemetry.api.GlobalOpenTelemetry;
Expand All @@ -12,7 +12,6 @@
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;

public final class CassandraSingletons {
Expand All @@ -32,8 +31,6 @@ public final class CassandraSingletons {
.addAttributesExtractor(
SqlClientAttributesExtractor.builder(attributesGetter)
.setTableAttribute(SemanticAttributes.DB_CASSANDRA_TABLE)
.setStatementSanitizationEnabled(
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
CommonConfig.get().isStatementSanitizationEnabled())
.build())
.addAttributesExtractor(
NetClientAttributesExtractor.create(new CassandraNetAttributesGetter()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesGetter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import static io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraSingletons.instrumenter;
import static io.opentelemetry.instrumentation.cassandra.v4_0.CassandraSingletons.instrumenter;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("com.datastax.oss")
module.set("java-driver-core")
versions.set("[4.14,)")
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
assertInverse.set(true)
}
}

dependencies {
implementation(project(":instrumentation:cassandra:cassandra-4.0:library"))

library("com.datastax.oss:java-driver-core:4.14.1")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

testImplementation("io.projectreactor:reactor-core:3.4.21")
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class CassandraClientInstrumentationModule extends InstrumentationModule {
public CassandraClientInstrumentationModule() {
super("cassandra", "cassandra-4.14");
}

SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new SessionBuilderInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import com.datastax.oss.driver.api.core.CqlSession;
import java.util.function.Function;

public class CompletionStageFunction implements Function<Object, Object> {

@Override
public Object apply(Object session) {
if (session == null) {
return null;
}
// This should cover ours and OT's TracingCqlSession
if (session.getClass().getName().endsWith("cassandra4.TracingCqlSession")) {
return session;
}
return new TracingCqlSession((CqlSession) session);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletionStage;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class SessionBuilderInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// Note: Cassandra has a large driver and we instrument single class in it.
// The rest is ignored in AdditionalLibraryIgnoresMatcher
return named("com.datastax.oss.driver.api.core.session.SessionBuilder");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("buildAsync")).and(takesArguments(0)),
SessionBuilderInstrumentation.class.getName() + "$BuildAdvice");
}

@SuppressWarnings("unused")
public static class BuildAdvice {

/**
* Strategy: each time we build a connection to a Cassandra cluster, the
* com.datastax.oss.driver.api.core.session.SessionBuilder.buildAsync() method is called. The
* opentracing contribution is a simple wrapper, so we just have to wrap the new session.
*
* @param stage The fresh CompletionStage to patch. This stage produces session which is
* replaced with new session
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void injectTracingSession(
@Advice.Return(readOnly = false) CompletionStage<?> stage) {
stage = stage.thenApply(new CompletionStageFunction());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.dse.driver.internal.core.cql.reactive.DefaultReactiveResultSet;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.Statement;

public class TracingCqlSession
extends io.opentelemetry.instrumentation.cassandra.v4_0.TracingCqlSession {
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved

public TracingCqlSession(CqlSession session) {
super(session);
}

@Override
public ReactiveResultSet executeReactive(Statement<?> statement) {
return new DefaultReactiveResultSet(() -> executeAsync(statement));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.config.DefaultDriverOption
import com.datastax.oss.driver.api.core.config.DriverConfigLoader
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.core.publisher.Flux
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import spock.lang.Shared

import java.time.Duration

import static io.opentelemetry.api.trace.SpanKind.CLIENT

class CassandraClientTest extends AgentInstrumentationSpecification {
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger logger = LoggerFactory.getLogger(CassandraClientTest)

@Shared
GenericContainer cassandra
@Shared
int cassandraPort

def setupSpec() {
cassandra = new GenericContainer("cassandra:4.0")
// limit memory usage
.withEnv("JVM_OPTS", "-Xmx128m -Xms128m")
.withExposedPorts(9042)
.withLogConsumer(new Slf4jLogConsumer(logger))
.withStartupTimeout(Duration.ofSeconds(120))
cassandra.start()

cassandraPort = cassandra.getMappedPort(9042)
}

def cleanupSpec() {
cassandra.stop()
}

def "test reactive"() {
setup:
CqlSession session = getSession(keyspace)

runWithSpan("parent") {
Flux.from(session.executeReactive(statement)).doOnComplete({ result ->
runWithSpan("child") {}
}).blockLast()
}

expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
cassandraSpan(it, 1, spanName, expectedStatement, operation, keyspace, table, span(0))
span(2) {
name "child"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}

cleanup:
session.close()

where:
keyspace | statement | expectedStatement | spanName | operation | table
null | "DROP KEYSPACE IF EXISTS reactive_test" | "DROP KEYSPACE IF EXISTS reactive_test" | "DB Query" | null | null
null | "CREATE KEYSPACE reactive_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | "CREATE KEYSPACE reactive_test WITH REPLICATION = {?:?, ?:?}" | "DB Query" | null | null
"reactive_test" | "CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )" | "CREATE TABLE reactive_test.users ( id UUID PRIMARY KEY, name text )" | "reactive_test" | null | null
"reactive_test" | "INSERT INTO reactive_test.users (id, name) values (uuid(), 'alice')" | "INSERT INTO reactive_test.users (id, name) values (uuid(), ?)" | "INSERT reactive_test.users" | "INSERT" | "reactive_test.users"
"reactive_test" | "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "SELECT * FROM users where name = ? ALLOW FILTERING" | "SELECT reactive_test.users" | "SELECT" | "users"
}

def cassandraSpan(TraceAssert trace, int index, String spanName, String statement, String operation, String keyspace, String table, Object parentSpan = null) {
trace.span(index) {
name spanName
kind CLIENT
if (parentSpan == null) {
hasNoParent()
} else {
childOf((SpanData) parentSpan)
}
attributes {
"$SemanticAttributes.NET_PEER_NAME" "localhost"
"$SemanticAttributes.NET_PEER_IP" "127.0.0.1"
"$SemanticAttributes.NET_PEER_PORT" cassandraPort
"$SemanticAttributes.DB_SYSTEM" "cassandra"
"$SemanticAttributes.DB_NAME" keyspace
"$SemanticAttributes.DB_STATEMENT" statement
"$SemanticAttributes.DB_OPERATION" operation
"$SemanticAttributes.DB_CASSANDRA_CONSISTENCY_LEVEL" "LOCAL_ONE"
"$SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC" "datacenter1"
"$SemanticAttributes.DB_CASSANDRA_COORDINATOR_ID" String
"$SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE" Boolean
"$SemanticAttributes.DB_CASSANDRA_PAGE_SIZE" 5000
"$SemanticAttributes.DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT" 0
// the SqlStatementSanitizer can't handle CREATE statements yet
"$SemanticAttributes.DB_CASSANDRA_TABLE" table
}
}
}

def getSession(String keyspace) {
DriverConfigLoader configLoader = DefaultDriverConfigLoader.builder()
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(0))
.build()
return CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace((String) keyspace)
.build()
}
}
Loading