Skip to content

Commit

Permalink
Fix zio-grpc server streams call trace
Browse files Browse the repository at this point in the history
  • Loading branch information
jxnu-liguobin committed Jan 24, 2024
1 parent 85a6956 commit dade906
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,19 @@ import net.bytebuddy.matcher.ElementMatcher
import net.bytebuddy.matcher.ElementMatchers.*

import org.apache.skywalking.apm.agent.core.plugin.`match`.*
import org.apache.skywalking.apm.agent.core.plugin.`match`.logical.LogicalMatchOperation
import org.apache.skywalking.apm.agent.core.plugin.interceptor.*
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.*

final class ZioGrpcServerCallInstrumentation extends ClassInstanceMethodsEnhancePluginDefine:

import ZioGrpcServerCallInstrumentation.*

override def enhanceClass(): ClassMatch = NameMatch.byName(ENHANCE_CLASS)
override def enhanceClass(): ClassMatch = ENHANCE_CLASS

override def witnessClasses(): Array[String] = Array("scalapb.zio_grpc.ServerImpl")

override def getConstructorsInterceptPoints: Array[ConstructorInterceptPoint] = Array(
new ConstructorInterceptPoint:
override def getConstructorMatcher: ElementMatcher[MethodDescription] = takesArguments(2)

override def getConstructorInterceptor: String = CLASS_INTERCEPTOR
)
override def getConstructorsInterceptPoints: Array[ConstructorInterceptPoint] = null

override def getInstanceMethodsInterceptPoints: Array[InstanceMethodsInterceptPoint] =
methodInterceptors
Expand All @@ -38,16 +34,18 @@ end ZioGrpcServerCallInstrumentation

object ZioGrpcServerCallInstrumentation:

final val CLASS_INTERCEPTOR =
"org.bitlap.skywalking.apm.plugin.ziogrpc.v06rcx.interceptor.ZioGrpcServerCallConstructorInterceptor"

final val CLOSE_METHOD_INTERCEPTOR =
"org.bitlap.skywalking.apm.plugin.ziogrpc.v06rcx.interceptor.ZioGrpcServerCloseInterceptor"

final val SEND_MESSAGE_METHOD_INTERCEPTOR =
"org.bitlap.skywalking.apm.plugin.ziogrpc.v06rcx.interceptor.ZioGrpcServerSendMessageInterceptor"

private final val ENHANCE_CLASS: String = "scalapb.zio_grpc.server.ZServerCall"
// see issue: https://github.com/scalapb/zio-grpc/issues/501, we cannot use Server Interceptor
// Because the server stream call calls `ServerCall.sendMessage` and `ServerCall.close`, and we intercept grpc directly.
private final val ENHANCE_CLASS = LogicalMatchOperation.or(
HierarchyMatch.byHierarchyMatch("io.grpc.ServerCall"),
MultiClassNameMatch.byMultiClassMatch("io.grpc.ServerCall")
)

val methodInterceptors: Map[String, ElementMatcher[MethodDescription]] = Map(
CLOSE_METHOD_INTERCEPTOR -> named("close"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@ package org.bitlap.skywalking.apm.plugin.ziogrpc.v06rcx.interceptor

import java.lang.reflect.Method

import scala.util.*
import scalapb.zio_grpc.*

import io.grpc.*
import io.grpc.Status.*

import zio.*

import org.apache.skywalking.apm.agent.core.context.*
import org.apache.skywalking.apm.agent.core.context.tag.Tags
import org.apache.skywalking.apm.agent.core.context.trace.*
Expand All @@ -28,11 +23,8 @@ final class ZioGrpcServerCloseInterceptor extends InstanceMethodsAroundIntercept
argumentsTypes: Array[Class[?]],
result: MethodInterceptResult
): Unit =
val ctx = objInst.getSkyWalkingDynamicField
if ctx == null || !ctx.isInstanceOf[OperationContext] then return

val cx = ctx.asInstanceOf[OperationContext]
val context = OperationContext.remove(cx.selfCall)
val call = objInst.asInstanceOf[ServerCall[?, ?]]
val context = OperationContext.remove(call)
if context == null then return

val span = beforeClose(context.contextSnapshot, context.methodDescriptor)
Expand Down Expand Up @@ -63,31 +55,22 @@ final class ZioGrpcServerCloseInterceptor extends InstanceMethodsAroundIntercept
if span == null || !span.isInstanceOf[AbstractSpan] then return ret

val status = allArguments(0).asInstanceOf[Status]
ret.asInstanceOf[GIO[Unit]].ensuring(ZIO.attempt(afterClose(status, ctx.asyncSpan, span)).ignoreLogged)
afterClose(status, ctx.asyncSpan, span)
ret
end afterMethod

private def afterClose(status: Status, asyncSpan: AbstractSpan, span: AbstractSpan): Unit =
status match {
case OK =>
case UNKNOWN =>
case INTERNAL =>
case UNKNOWN | INTERNAL =>
if status.getCause == null then span.log(status.asRuntimeException)
else span.log(status.getCause)
case _ =>
if status.getCause != null then span.log(status.getCause)
}
Tags.RPC_RESPONSE_STATUS_CODE.set(span, status.getCode.name)
try span.asyncFinish
catch {
case t: Throwable =>
ContextManager.activeSpan.log(t)
} finally
try
asyncSpan.asyncFinish()
catch {
case ignore: Throwable =>
}
ContextManager.stopSpan()
AgentUtils.stopAsync(span)
AgentUtils.stopAsync(asyncSpan)
ContextManager.stopSpan()

override def handleMethodException(
objInst: EnhancedInstance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ package org.bitlap.skywalking.apm.plugin.ziogrpc.v06rcx.interceptor

import java.lang.reflect.Method

import scalapb.zio_grpc.*

import io.grpc.*

import zio.ZIO

import org.apache.skywalking.apm.agent.core.context.*
import org.apache.skywalking.apm.agent.core.context.trace.*
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.*
Expand All @@ -24,11 +20,8 @@ final class ZioGrpcServerSendMessageInterceptor extends InstanceMethodsAroundInt
argumentsTypes: Array[Class[?]],
result: MethodInterceptResult
): Unit =
val ctx = objInst.getSkyWalkingDynamicField
if ctx == null || !ctx.isInstanceOf[OperationContext] then return

val cx = ctx.asInstanceOf[OperationContext]
val context = OperationContext.get(cx.selfCall)
val call = objInst.asInstanceOf[ServerCall[?, ?]]
val context = OperationContext.get(call)
if context == null then return
val span = beforeSendMessage(context.contextSnapshot, context.methodDescriptor)
span.foreach(s => objInst.setSkyWalkingDynamicField(context.copy(activeSpan = s)))
Expand Down Expand Up @@ -61,12 +54,10 @@ final class ZioGrpcServerSendMessageInterceptor extends InstanceMethodsAroundInt
if context == null || !context.isInstanceOf[OperationContext] then return ret

val ctx = context.asInstanceOf[OperationContext]

if ctx.activeSpan == null then return ret

AgentUtils.stopAsync(ctx.activeSpan)
ContextManager.stopSpan()
ret
.asInstanceOf[GIO[Unit]]
.ensuring(ZIO.attempt { ctx.activeSpan.asyncFinish(); ContextManager.stopSpan() }.ignoreLogged)

end afterMethod

override def handleMethodException(
Expand Down
Binary file not shown.

0 comments on commit dade906

Please sign in to comment.