Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution hangs in test with channels #283

Open
avpotapov00 opened this issue Feb 23, 2024 · 1 comment
Open

Execution hangs in test with channels #283

avpotapov00 opened this issue Feb 23, 2024 · 1 comment
Assignees
Labels
bug Something isn't working

Comments

@avpotapov00
Copy link
Collaborator

Please consider the following test.
Error reproduces with Lincheck version 2.26 with the following output:
image

The test:

@file:OptIn(InternalCoroutinesApi::class)
/*
 * Lincheck
 *
 * Copyright (C) 2019 - 2024 JetBrains s.r.o.
 *
 * This Source Code Form is subject to the terms of the
 * Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed
 * with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */
package org.jetbrains.kotlinx.lincheck_test

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.SelectClause1
import kotlinx.coroutines.selects.select
import org.jetbrains.kotlinx.lincheck.Options
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.annotations.Param
import org.jetbrains.kotlinx.lincheck.check
import org.jetbrains.kotlinx.lincheck.paramgen.IntGen
import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions
import org.junit.Test

class Buffered1BroadcastChannelLincheckTest : ChannelLincheckTestBase(
    c = ChannelViaBroadcast(BroadcastChannel(1)),
    sequentialSpecification = SequentialBuffered1Channel::class.java,
    obstructionFree = false
)

class Buffered2BroadcastChannelLincheckTest : ChannelLincheckTestBase(
    c = ChannelViaBroadcast(BroadcastChannel(2)),
    sequentialSpecification = SequentialBuffered2Channel::class.java,
    obstructionFree = false
)

@Param.Params(
    Param(name = "value", gen = IntGen::class, conf = "1:9"),
    Param(name = "closeToken", gen = IntGen::class, conf = "1:9")
)
abstract class ChannelLincheckTestBase(
    protected val c: Channel<Int>,
    private val sequentialSpecification: Class<*>,
    private val obstructionFree: Boolean = true
) {

    @Operation(allowExtraSuspension = true, blocking = true)
    suspend fun send(@Param(name = "value") value: Int): Any = try {
//        uselessField.incrementAndGet()
        c.send(value)
    } catch (e: NumberedCancellationException) {
        e.testResult
    }

    @Operation(allowExtraSuspension = true, blocking = true)
    suspend fun receive(): Any = try {
        c.receive()
    } catch (e: NumberedCancellationException) {
        e.testResult
    }

    @Operation(allowExtraSuspension = true, blocking = true)
    suspend fun receiveCatching(): Any = c.receiveCatching()
        .onSuccess { return it }
        .onClosed { e -> return (e as NumberedCancellationException).testResult }

    @Operation(blocking = true)
    fun tryReceive(): Any? =
        c.tryReceive()
            .onSuccess { return it }
            .onFailure { return if (it is NumberedCancellationException) it.testResult else null }

    @Operation(allowExtraSuspension = true, blocking = true)
    suspend fun receiveViaSelect(): Any = try {
        select<Int> { c.onReceive { it } }
    } catch (e: NumberedCancellationException) {
        e.testResult
    }

    @Operation(causesBlocking = true, blocking = true)
    fun close(@Param(name = "closeToken") token: Int): Boolean = c.close(NumberedCancellationException(token))

    @Operation(causesBlocking = true, blocking = true)
    fun cancel(@Param(name = "closeToken") token: Int) = c.cancel(NumberedCancellationException(token))

    @Operation(blocking = true)
    fun isClosedForSend() = c.isClosedForSend

    fun <O : Options<O, *>> O.customize(isStressTest: Boolean) =
        actorsBefore(0).sequentialSpecification(sequentialSpecification)
            .addCustomScenario {
                parallel {
                    thread {
                        actor(::receiveCatching)
                        actor(::cancel, 5)
                    }
                    thread {
                        actor(::send, 5)
                        actor(::receive)
                    }
                    thread {
                        actor(::receive)
                        actor(::close, 5)
                    }
                }
            }


    @Test
    fun modelCheckingTest() = ModelCheckingOptions()
        .iterations(if (isStressTest) 200 else 20)
        .invocationsPerIteration(if (isStressTest) 10_000 else 1_000)
        .actorsBefore(if (isStressTest) 3 else 1)
        .threads(3)
        .actorsPerThread(if (isStressTest) 3 else 2)
        .actorsAfter(if (isStressTest) 3 else 0)
        .checkObstructionFreedom(obstructionFree)
        .minimizeFailedScenario(false)
        .check(this::class)

}

private class NumberedCancellationException(number: Int) : CancellationException() {
    val testResult = "Closed($number)"
}

class SequentialBuffered2Channel : SequentialIntChannelBase(2)
class SequentialBuffered1Channel : SequentialIntChannelBase(1)

@InternalCoroutinesApi
abstract class SequentialIntChannelBase(private val capacity: Int) {
    private val senders = ArrayList<Pair<CancellableContinuation<Any>, Int>>()
    private val receivers = ArrayList<CancellableContinuation<Any>>()
    private val buffer = ArrayList<Int>()
    private var closedMessage: String? = null

    suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) {
        true -> Unit
        false -> suspendCancellableCoroutine { cont ->
            senders.add(cont to x)
        }

        else -> offerRes
    }

    fun trySend(element: Int): Any {
        if (closedMessage !== null) return closedMessage!!
        if (capacity == Channel.CONFLATED) {
            if (resumeFirstReceiver(element)) return true
            buffer.clear()
            buffer.add(element)
            return true
        }
        if (resumeFirstReceiver(element)) return true
        if (buffer.size < capacity) {
            buffer.add(element)
            return true
        }
        return false
    }

    private fun resumeFirstReceiver(element: Int): Boolean {
        while (receivers.isNotEmpty()) {
            val r = receivers.removeAt(0)
            if (r.resume(element)) return true
        }
        return false
    }

    suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont ->
        receivers.add(cont)
    }

    suspend fun receiveCatching() = receive()

    fun tryReceive(): Any? {
        if (buffer.isNotEmpty()) {
            val el = buffer.removeAt(0)
            resumeFirstSender().also {
                if (it !== null) buffer.add(it)
            }
            return el
        }
        resumeFirstSender()?.also { return it }
        if (closedMessage !== null) return closedMessage
        return null
    }

    private fun resumeFirstSender(): Int? {
        while (senders.isNotEmpty()) {
            val (s, el) = senders.removeAt(0)
            if (s.resume(Unit)) return el
        }
        return null
    }

    suspend fun sendViaSelect(element: Int) = send(element)
    suspend fun receiveViaSelect() = receive()

    fun close(token: Int): Boolean {
        if (closedMessage !== null) return false
        closedMessage = "Closed($token)"
        for (r in receivers) r.resume(closedMessage!!)
        receivers.clear()
        return true
    }

    fun cancel(token: Int) {
        close(token)
        for ((s, _) in senders) s.resume(closedMessage!!)
        senders.clear()
        buffer.clear()
    }

    fun isClosedForSend(): Boolean = closedMessage !== null
    fun isClosedForReceive(): Boolean = isClosedForSend() && buffer.isEmpty() && senders.isEmpty()

    fun isEmpty(): Boolean {
        if (closedMessage !== null) return false
        return buffer.isEmpty() && senders.isEmpty()
    }
}

@InternalCoroutinesApi
private fun <T> CancellableContinuation<T>.resume(res: T): Boolean {
    val token = tryResume(res) ?: return false
    completeResume(token)
    return true
}


private val isStressTest = false

internal class ChannelViaBroadcast<E>(
    private val broadcast: BroadcastChannel<E>
) : Channel<E>, SendChannel<E> by broadcast {
    val sub = broadcast.openSubscription()

    override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
    override val isEmpty: Boolean get() = sub.isEmpty

    override suspend fun receive(): E = sub.receive()
    override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching()
    override fun iterator(): ChannelIterator<E> = sub.iterator()
    override fun tryReceive(): ChannelResult<E> = sub.tryReceive()

    override fun cancel(cause: CancellationException?) = broadcast.cancel(cause)

    // implementing hidden method anyway, so can cast to an internal class
    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
    override fun cancel(cause: Throwable?): Boolean = error("unsupported")

    override val onReceive: SelectClause1<E>
        get() = sub.onReceive
    override val onReceiveCatching: SelectClause1<ChannelResult<E>>
        get() = sub.onReceiveCatching
}
@ndkoval ndkoval added the bug Something isn't working label Mar 19, 2024
@ndkoval
Copy link
Collaborator

ndkoval commented Jul 12, 2024

@avpotapov00, does this issue still reproduce?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants