Skip to content

Commit

Permalink
fix(anr): immediately run any tasks submitted to the same background …
Browse files Browse the repository at this point in the history
…queue submitting the work to avoid possible thread starvation
  • Loading branch information
lemnik committed Oct 17, 2022
1 parent 4def9ac commit e898f68
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 19 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## TBD

### Bug fixes

* Fixed rare thread-starvation issue where some internal failures could lead to deadlocks. This was most noticeable
when attempting to call Bugsnag.start on an architecture (ABI) that was not packaged in the APK, and lead to an
ANR instead of an error report.
[#1768](https://github.com/bugsnag/bugsnag-android/pull/1768)

## 5.28.0 (2022-10-13)

### Enhancements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package com.bugsnag.android
import androidx.annotation.VisibleForTesting
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.lang.Thread as JThread

/**
* The type of task which is being submitted. This determines which execution queue
Expand Down Expand Up @@ -55,9 +58,14 @@ private const val THREAD_POOL_SIZE = 1
private const val KEEP_ALIVE_SECS = 30L
private const val TASK_QUEUE_SIZE = 128

internal fun createExecutor(name: String, keepAlive: Boolean): ThreadPoolExecutor {
private class TaskTypeThread(runnable: Runnable, name: String, val taskType: TaskType) :
JThread(runnable, name)

internal val JThread.taskType get() = (this as? TaskTypeThread)?.taskType

internal fun createExecutor(name: String, type: TaskType, keepAlive: Boolean): ExecutorService {
val queue: BlockingQueue<Runnable> = LinkedBlockingQueue(TASK_QUEUE_SIZE)
val threadFactory = ThreadFactory { Thread(it, name) }
val threadFactory = ThreadFactory { TaskTypeThread(it, name, type) }

// certain executors (error/session/io) should always keep their threads alive, but others
// are less important so are allowed a pool size of 0 that expands on demand.
Expand Down Expand Up @@ -86,33 +94,38 @@ internal fun createExecutor(name: String, keepAlive: Boolean): ThreadPoolExecuto
internal class BackgroundTaskService(
// these executors must remain single-threaded - the SDK makes assumptions
// about synchronization based on this.
@VisibleForTesting
internal val errorExecutor: ThreadPoolExecutor = createExecutor(
@get:VisibleForTesting
internal val errorExecutor: ExecutorService = createExecutor(
"Bugsnag Error thread",
TaskType.ERROR_REQUEST,
true
),

@VisibleForTesting
internal val sessionExecutor: ThreadPoolExecutor = createExecutor(
@get:VisibleForTesting
internal val sessionExecutor: ExecutorService = createExecutor(
"Bugsnag Session thread",
TaskType.SESSION_REQUEST,
true
),

@VisibleForTesting
internal val ioExecutor: ThreadPoolExecutor = createExecutor(
@get:VisibleForTesting
internal val ioExecutor: ExecutorService = createExecutor(
"Bugsnag IO thread",
TaskType.IO,
true
),

@VisibleForTesting
internal val internalReportExecutor: ThreadPoolExecutor = createExecutor(
@get:VisibleForTesting
internal val internalReportExecutor: ExecutorService = createExecutor(
"Bugsnag Internal Report thread",
TaskType.INTERNAL_REPORT,
false
),

@VisibleForTesting
internal val defaultExecutor: ThreadPoolExecutor = createExecutor(
@get:VisibleForTesting
internal val defaultExecutor: ExecutorService = createExecutor(
"Bugsnag Default thread",
TaskType.DEFAULT,
false
)
) {
Expand All @@ -138,13 +151,17 @@ internal class BackgroundTaskService(
*/
@Throws(RejectedExecutionException::class)
fun <T> submitTask(taskType: TaskType, callable: Callable<T>): Future<T> {
return when (taskType) {
TaskType.ERROR_REQUEST -> errorExecutor.submit(callable)
TaskType.SESSION_REQUEST -> sessionExecutor.submit(callable)
TaskType.IO -> ioExecutor.submit(callable)
TaskType.INTERNAL_REPORT -> internalReportExecutor.submit(callable)
TaskType.DEFAULT -> defaultExecutor.submit(callable)
val task = FutureTask(callable)

when (taskType) {
TaskType.ERROR_REQUEST -> errorExecutor.execute(task)
TaskType.SESSION_REQUEST -> sessionExecutor.execute(task)
TaskType.IO -> ioExecutor.execute(task)
TaskType.INTERNAL_REPORT -> internalReportExecutor.execute(task)
TaskType.DEFAULT -> defaultExecutor.execute(task)
}

return SafeFuture(task, taskType)
}

/**
Expand All @@ -168,11 +185,34 @@ internal class BackgroundTaskService(
ioExecutor.awaitTerminationSafe()
}

private fun ThreadPoolExecutor.awaitTerminationSafe() {
private fun ExecutorService.awaitTerminationSafe() {
try {
awaitTermination(SHUTDOWN_WAIT_MS, TimeUnit.MILLISECONDS)
} catch (ignored: InterruptedException) {
// ignore interrupted exception as the JVM is shutting down
}
}

private class SafeFuture<V>(
private val delegate: FutureTask<V>,
private val taskType: TaskType
) : Future<V> by delegate {
override fun get(): V {
ensureTaskGetSafe()
return delegate.get()
}

override fun get(timeout: Long, unit: TimeUnit?): V {
ensureTaskGetSafe()
return delegate.get(timeout, unit)
}

private fun ensureTaskGetSafe() {
if (!delegate.isDone && JThread.currentThread().taskType == taskType) {
// if this is the execution queue for the wrapped FutureTask && it is not yet 'done'
// then it has not yet been started, so we run it immediately
delegate.run()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,27 @@ internal class BackgroundTaskServiceTest {
assertRejectedExecution(service, TaskType.IO)
}

/**
* Test that tasks submitted to the same queue within a task work without deadlocking the queue.
* This has a 5 second timeout since the symptom of a failure is thread-starvation / deadlock.
*/
@Test(timeout = 5_000)
fun testSubmitOnSubmit() {
val service = BackgroundTaskService()

TaskType.values().forEach { taskType ->
val result = service.submitTask<Pair<TaskType, String>>(taskType) {
service.submitTask<Pair<TaskType, String>>(taskType) {
taskType to "done"
}.get()
}.get()

assertEquals(taskType to "done", result)
}

service.shutdown()
}

private fun submitBlockingJob(
service: BackgroundTaskService,
latch: CountDownLatch,
Expand Down

0 comments on commit e898f68

Please sign in to comment.