We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Please consider the following test. Error reproduces with Lincheck version 2.26 with the following output:
The test:
@file:OptIn(InternalCoroutinesApi::class) /* * Lincheck * * Copyright (C) 2019 - 2024 JetBrains s.r.o. * * This Source Code Form is subject to the terms of the * Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed * with this file, You can obtain one at http://mozilla.org/MPL/2.0/. */ package org.jetbrains.kotlinx.lincheck_test import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.SelectClause1 import kotlinx.coroutines.selects.select import org.jetbrains.kotlinx.lincheck.Options import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.annotations.Param import org.jetbrains.kotlinx.lincheck.check import org.jetbrains.kotlinx.lincheck.paramgen.IntGen import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions import org.junit.Test class Buffered1BroadcastChannelLincheckTest : ChannelLincheckTestBase( c = ChannelViaBroadcast(BroadcastChannel(1)), sequentialSpecification = SequentialBuffered1Channel::class.java, obstructionFree = false ) class Buffered2BroadcastChannelLincheckTest : ChannelLincheckTestBase( c = ChannelViaBroadcast(BroadcastChannel(2)), sequentialSpecification = SequentialBuffered2Channel::class.java, obstructionFree = false ) @Param.Params( Param(name = "value", gen = IntGen::class, conf = "1:9"), Param(name = "closeToken", gen = IntGen::class, conf = "1:9") ) abstract class ChannelLincheckTestBase( protected val c: Channel<Int>, private val sequentialSpecification: Class<*>, private val obstructionFree: Boolean = true ) { @Operation(allowExtraSuspension = true, blocking = true) suspend fun send(@Param(name = "value") value: Int): Any = try { // uselessField.incrementAndGet() c.send(value) } catch (e: NumberedCancellationException) { e.testResult } @Operation(allowExtraSuspension = true, blocking = true) suspend fun receive(): Any = try { c.receive() } catch (e: NumberedCancellationException) { e.testResult } @Operation(allowExtraSuspension = true, blocking = true) suspend fun receiveCatching(): Any = c.receiveCatching() .onSuccess { return it } .onClosed { e -> return (e as NumberedCancellationException).testResult } @Operation(blocking = true) fun tryReceive(): Any? = c.tryReceive() .onSuccess { return it } .onFailure { return if (it is NumberedCancellationException) it.testResult else null } @Operation(allowExtraSuspension = true, blocking = true) suspend fun receiveViaSelect(): Any = try { select<Int> { c.onReceive { it } } } catch (e: NumberedCancellationException) { e.testResult } @Operation(causesBlocking = true, blocking = true) fun close(@Param(name = "closeToken") token: Int): Boolean = c.close(NumberedCancellationException(token)) @Operation(causesBlocking = true, blocking = true) fun cancel(@Param(name = "closeToken") token: Int) = c.cancel(NumberedCancellationException(token)) @Operation(blocking = true) fun isClosedForSend() = c.isClosedForSend fun <O : Options<O, *>> O.customize(isStressTest: Boolean) = actorsBefore(0).sequentialSpecification(sequentialSpecification) .addCustomScenario { parallel { thread { actor(::receiveCatching) actor(::cancel, 5) } thread { actor(::send, 5) actor(::receive) } thread { actor(::receive) actor(::close, 5) } } } @Test fun modelCheckingTest() = ModelCheckingOptions() .iterations(if (isStressTest) 200 else 20) .invocationsPerIteration(if (isStressTest) 10_000 else 1_000) .actorsBefore(if (isStressTest) 3 else 1) .threads(3) .actorsPerThread(if (isStressTest) 3 else 2) .actorsAfter(if (isStressTest) 3 else 0) .checkObstructionFreedom(obstructionFree) .minimizeFailedScenario(false) .check(this::class) } private class NumberedCancellationException(number: Int) : CancellationException() { val testResult = "Closed($number)" } class SequentialBuffered2Channel : SequentialIntChannelBase(2) class SequentialBuffered1Channel : SequentialIntChannelBase(1) @InternalCoroutinesApi abstract class SequentialIntChannelBase(private val capacity: Int) { private val senders = ArrayList<Pair<CancellableContinuation<Any>, Int>>() private val receivers = ArrayList<CancellableContinuation<Any>>() private val buffer = ArrayList<Int>() private var closedMessage: String? = null suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) { true -> Unit false -> suspendCancellableCoroutine { cont -> senders.add(cont to x) } else -> offerRes } fun trySend(element: Int): Any { if (closedMessage !== null) return closedMessage!! if (capacity == Channel.CONFLATED) { if (resumeFirstReceiver(element)) return true buffer.clear() buffer.add(element) return true } if (resumeFirstReceiver(element)) return true if (buffer.size < capacity) { buffer.add(element) return true } return false } private fun resumeFirstReceiver(element: Int): Boolean { while (receivers.isNotEmpty()) { val r = receivers.removeAt(0) if (r.resume(element)) return true } return false } suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont -> receivers.add(cont) } suspend fun receiveCatching() = receive() fun tryReceive(): Any? { if (buffer.isNotEmpty()) { val el = buffer.removeAt(0) resumeFirstSender().also { if (it !== null) buffer.add(it) } return el } resumeFirstSender()?.also { return it } if (closedMessage !== null) return closedMessage return null } private fun resumeFirstSender(): Int? { while (senders.isNotEmpty()) { val (s, el) = senders.removeAt(0) if (s.resume(Unit)) return el } return null } suspend fun sendViaSelect(element: Int) = send(element) suspend fun receiveViaSelect() = receive() fun close(token: Int): Boolean { if (closedMessage !== null) return false closedMessage = "Closed($token)" for (r in receivers) r.resume(closedMessage!!) receivers.clear() return true } fun cancel(token: Int) { close(token) for ((s, _) in senders) s.resume(closedMessage!!) senders.clear() buffer.clear() } fun isClosedForSend(): Boolean = closedMessage !== null fun isClosedForReceive(): Boolean = isClosedForSend() && buffer.isEmpty() && senders.isEmpty() fun isEmpty(): Boolean { if (closedMessage !== null) return false return buffer.isEmpty() && senders.isEmpty() } } @InternalCoroutinesApi private fun <T> CancellableContinuation<T>.resume(res: T): Boolean { val token = tryResume(res) ?: return false completeResume(token) return true } private val isStressTest = false internal class ChannelViaBroadcast<E>( private val broadcast: BroadcastChannel<E> ) : Channel<E>, SendChannel<E> by broadcast { val sub = broadcast.openSubscription() override val isClosedForReceive: Boolean get() = sub.isClosedForReceive override val isEmpty: Boolean get() = sub.isEmpty override suspend fun receive(): E = sub.receive() override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching() override fun iterator(): ChannelIterator<E> = sub.iterator() override fun tryReceive(): ChannelResult<E> = sub.tryReceive() override fun cancel(cause: CancellationException?) = broadcast.cancel(cause) // implementing hidden method anyway, so can cast to an internal class @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") override fun cancel(cause: Throwable?): Boolean = error("unsupported") override val onReceive: SelectClause1<E> get() = sub.onReceive override val onReceiveCatching: SelectClause1<ChannelResult<E>> get() = sub.onReceiveCatching }
The text was updated successfully, but these errors were encountered:
@avpotapov00, does this issue still reproduce?
Sorry, something went wrong.
avpotapov00
No branches or pull requests
Please consider the following test.
Error reproduces with Lincheck version 2.26 with the following output:
The test:
The text was updated successfully, but these errors were encountered: