Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUMM-2780: Synchronize access to DatadogRumMonitor#rootScope when processing fatal error #1186

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,10 @@ internal class DatadogRumMonitor(

internal fun handleEvent(event: RumRawEvent) {
if (event is RumRawEvent.AddError && event.isFatal) {
@Suppress("ThreadSafety") // Crash handling, can't delegate to another thread
rootScope.handleEvent(event, writer)
synchronized(rootScope) {
@Suppress("ThreadSafety") // Crash handling, can't delegate to another thread
rootScope.handleEvent(event, writer)
}
} else if (event is RumRawEvent.SendTelemetry) {
@Suppress("ThreadSafety") // TODO RUMM-1503 delegate to another thread
telemetryEventHandler.handleEvent(event, writer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import com.datadog.tools.unit.forge.aThrowable
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.argThat
import com.nhaarman.mockitokotlin2.argumentCaptor
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.inOrder
Expand All @@ -65,6 +66,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.RepeatedTest
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.extension.Extensions
import org.junit.jupiter.params.ParameterizedTest
Expand All @@ -74,6 +76,8 @@ import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.junit.jupiter.MockitoSettings
import org.mockito.quality.Strictness
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -1528,6 +1532,107 @@ internal class DatadogRumMonitorTest {
}
}

@Test
fun `M allow only one thread inside rootScope#handleEvent at the time W handleEvent()`(
forge: Forge
) {
// Given
val mockExecutorService = mock<ExecutorService>().apply {
// this is the mock for the inner ExecutorService, Futures returned by this one
// are never used in the code
whenever(submit(any())) doAnswer {
it.getArgument<Runnable>(0).run()
object : Future<Any> {
override fun cancel(mayInterruptIfRunning: Boolean) =
error("Not supposed to be called")

override fun isCancelled(): Boolean = error("Not supposed to be called")
override fun isDone(): Boolean = error("Not supposed to be called")
override fun get(): Any = error("Not supposed to be called")
override fun get(timeout: Long, unit: TimeUnit?): Any =
error("Not supposed to be called")
}
}
}

testedMonitor = DatadogRumMonitor(
fakeApplicationId,
mockSdkCore,
fakeSamplingRate,
fakeBackgroundTrackingEnabled,
fakeTrackFrustrations,
mockWriter,
mockHandler,
mockTelemetryEventHandler,
mockDetector,
mockCpuVitalMonitor,
mockMemoryVitalMonitor,
mockFrameRateVitalMonitor,
mockSessionListener,
mockContextProvider,
executorService = mockExecutorService
)

var isMethodOccupied = false
val mockRootScope = mock<RumScope>().apply {
whenever(handleEvent(any(), any())) doAnswer {
if (isMethodOccupied) {
throw IllegalStateException(
"Only one thread should" +
" be allowed to enter rootScope at the time."
)
}
isMethodOccupied = true
Thread.sleep(100)
isMethodOccupied = false
null
}
}
testedMonitor.rootScope = mockRootScope
// this is another executor, to imitate a bunch of external concurrent
// calls to DatadogRumMonitor
val executor = Executors.newFixedThreadPool(10)
val futures = mutableListOf<Future<*>>()

// When
repeat(10) {
futures += executor.submit {
// we are not going to generate all set of the events, only AddError + fatal
// which has a special handling + few simple others
val event = forge.anElementFrom(
RumRawEvent.AddError(
message = forge.anAlphaNumericalString(),
source = forge.aValueFrom(RumErrorSource::class.java),
isFatal = true,
throwable = forge.aThrowable(),
stacktrace = forge.anAlphaNumericalString(),
attributes = emptyMap()
),
RumRawEvent.StartAction(
type = forge.aValueFrom(RumActionType::class.java),
name = forge.anAlphaNumericalString(),
waitForStop = forge.aBool(),
attributes = emptyMap()
),
RumRawEvent.StartView(
key = Any(),
name = forge.anAlphaNumericalString(),
attributes = emptyMap()
)
)
testedMonitor.handleEvent(event)
}
}

// Then
assertDoesNotThrow {
futures.forEach {
// none of these should throw
it.get()
0xnm marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

companion object {
const val TIMESTAMP_MIN = 1000000000000
const val TIMESTAMP_MAX = 2000000000000
Expand Down