Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor merge implementation #3242

Merged
merged 11 commits into from
Jul 27, 2023
Merged

Conversation

zainab-ali
Copy link
Contributor

@zainab-ali zainab-ali commented Jun 19, 2023

To give some context, @djspiewak and I were looking through merge and found the implementation difficult to understand, then tried to rewrite it to use new cats-effect primitives.

I'm not sure that the code I've settled on is actually any better than the old implementation, but think it's worth getting a second opinion on.

The main differences are:

  • The use of SingallingRef over two Deferreds for managing the upstream state.
  • The use of evalMap(guard.aquire) instead of an explicit pull for pulling from upstream.

core/shared/src/main/scala/fs2/Stream.scala Outdated Show resolved Hide resolved
core/shared/src/main/scala/fs2/Stream.scala Outdated Show resolved Hide resolved
core/shared/src/main/scala/fs2/Stream.scala Outdated Show resolved Hide resolved
core/shared/src/main/scala/fs2/Stream.scala Outdated Show resolved Hide resolved
zainab-ali and others added 6 commits June 20, 2023 14:18
Co-authored-by: Diego E. Alonso <diesalbla@gmail.com>
Co-authored-by: Diego E. Alonso <diesalbla@gmail.com>
Co-authored-by: Diego E. Alonso <diesalbla@gmail.com>
@zainab-ali
Copy link
Contributor Author

The most recent builds failed due to failures in the following tests:

  • StreamCombinatorsSuite - awakeEvery - basic on MacOS
  • UnixSocketsSuite.echoes - jdk on MacOS

Neither of these failures are related to the merge implementation. I assume these tests are flaky and should be marked as such, or corrected?

@armanbilge
Copy link
Member

or corrected

most time-based tests can be run on the Cats Effect test runtime with executeEmbed, this should fix the flakiness

group("awakeEvery") {
test("basic") {
Stream
.awakeEvery[IO](500.millis)

@zainab-ali
Copy link
Contributor Author

I've used TestControl for most viable cases in the StreamCombinatorsSuite. I've also added a pretty printer for the scalacheck failures so we can see the elements of a stream on test failure.

Looking at the size of the changeset, it might be worth pulling it out into a different PR - if so, let me know.

@@ -55,7 +55,7 @@ class HashSuite extends Fs2Suite with HashSuitePlatform with TestPlatform {
}

test("empty input") {
Stream.empty[IO].through(sha1).compile.count.assertEquals(20L)
Stream.empty.covary[IO].through(sha1).compile.count.assertEquals(20L)
Copy link
Contributor Author

@zainab-ali zainab-ali Jun 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On adding the pretty printer implicit conversion, Stream.empty resolved to the val empty instead of def empty[A] on the monoidK instance. I'm at a loss as to why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 110 to 112
Stream
.empty[IO]
.empty
.covary[IO]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, I missed #3242 (comment). Thanks!

Copy link
Contributor Author

@zainab-ali zainab-ali Jun 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a compile error in the build (locally too), which I'm still figuring out:

[error] /home/runner/work/fs2/fs2/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala:111:13: value empty of type fs2.Stream[fs2.Pure,Nothing] does not take type parameters.
[error]       .empty[IO]
[error]             ^
[error] one error found

It's caused by the implicit added for the pretty printer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I know why:

Stream.empty[IO] uses an apply, so is desugared to Stream.empty.apply[IO]:

The apply[F[_]] function is added by PureOps:

  implicit final class PureOps[O](private val self: Stream[Pure, O]) extends AnyVal {
    def apply[F[_]]: Stream[F, O] = covary
}

This conflicts with the apply function in the scalacheck pretty printer.

@mpilquist mpilquist merged commit 908caf3 into typelevel:main Jul 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants