Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide a chunkDecoder pipe #273

Closed
samspills opened this issue Jul 22, 2021 · 1 comment · Fixed by #274
Closed

provide a chunkDecoder pipe #273

samspills opened this issue Jul 22, 2021 · 1 comment · Fixed by #274

Comments

@samspills
Copy link
Contributor

The current decoder pipe emits each element back into the stream as a singleton chunk. I'd like to propose adding a second method that decodes while preserving the underlying chunks of the stream. I have this work done in a fork (chunkDecoder implementation), and I'd be happy to open a PR to contribute. First though I wanted to check if others would find this useful.

Motivation

The reason I'm proposing this is because of a weird performance degradation I hit in a work project after upgrading fs2v2.5.9 -> fs2v3.0.6. Our use case looks like this:

val stream: Stream[IO, Json] = ???
val blerf = stream.through(decoder[IO, Foo])
  .map(doTransform)
  .chunkN(25000)
  .evalMap(doBulkRequest)
  .compile
  .drain

So we're decoding some json into a case class, we do some transform on the data, and then we rechunk the stream so we can produce a bulk request of 25000 Foos at a time. From fs2-v2.5.9 to fs2-v3.0.6 the chunk logic was simplified. In particular the Queue behaviour when rechunking was changed such that there is now a performance tradeoff when rechunking very many small chunks.

Key tradeoff is that if you have lots of tiny chunks backing a chunk, index based access is O(n), but this is a rare pattern in practice, and folks can always call .compact to flatten to a single array backed chunk.
-- typelevel/fs2#2181 (comment)

The .compact method does solve the problem, but the "rare pattern" described is set up to be less rare by the current decoder. Using a chunkDecoder would keep our original stream chunking, and let us avoid the call to .compact + the copying that comes with it.

Lazy Evaluation

The evaluation differences would need to be made clear in the docstrings. Decoding chunks will be lazy, but each element in a chunk will be decoded eagerly. For example:

stream.chunkN(199).through(chunkDecoder[IO, Foo]).take(400) 

would decode 597 elements in total (3 chunks) in order to take the 400 objects. The standard decoder would decode only the 400 objects being taken. I don't think this is a problem, but important to call out. I'd add a docstring to the existing decoder method to make its behaviour obvious compared to the proposed chunkDecoder.

@travisbrown
Copy link
Member

Sounds reasonable to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants