Skip to content

Commit

Permalink
Merge pull request #1004 from mpilquist/topic/groupAdjacentBy
Browse files Browse the repository at this point in the history
Renamed groupBy to groupAdjacentBy and changed its return type to provide a Segment instead of a Vector
  • Loading branch information
pchlupacek authored Nov 28, 2017
2 parents e2d2536 + 9e2d818 commit 8ba9365
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
8 changes: 4 additions & 4 deletions core/jvm/src/test/scala/fs2/PipeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ class PipeSpec extends Fs2Spec {
runLog(s.get.forall(f)) shouldBe Vector(runLog(s.get).forall(f))
}

"groupBy" in forAll { (s: PureStream[Int], n: SmallPositive) =>
"groupAdjacentBy" in forAll { (s: PureStream[Int], n: SmallPositive) =>
val f = (i: Int) => i % n.get
val s1 = s.get.groupBy(f)
val s1 = s.get.groupAdjacentBy(f)
val s2 = s.get.map(f).changes
runLog(s1.map(_._2)).flatten shouldBe runLog(s.get)
runLog(s1.map(_._2)).flatMap(_.toVector) shouldBe runLog(s.get)
runLog(s1.map(_._1)) shouldBe runLog(s2)
runLog(s1.map { case (k, vs) => vs.forall(f(_) == k) }) shouldBe runLog(s2.map(_ => true))
runLog(s1.map { case (k, vs) => vs.toVector.forall(f(_) == k) }) shouldBe runLog(s2.map(_ => true))
}

"head" in forAll { (s: PureStream[Int]) =>
Expand Down
32 changes: 16 additions & 16 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -534,53 +534,53 @@ final class Stream[+F[_],+O] private(private val free: FreeC[Algebra[Nothing,Not
this.pull.forall(p).flatMap(Pull.output1).stream

/**
* Partitions the input into a stream of chunks according to a discriminator function.
* Partitions the input into a stream of segments according to a discriminator function.
*
* Each chunk in the source stream is grouped using the supplied discriminator function
* and the results of the grouping are emitted each time the discriminator function changes
* values.
*
* @example {{{
* scala> import cats.implicits._
* scala> Stream("Hello", "Hi", "Greetings", "Hey").groupBy(_.head).toList
* res0: List[(Char,Vector[String])] = List((H,Vector(Hello, Hi)), (G,Vector(Greetings)), (H,Vector(Hey)))
* scala> Stream("Hello", "Hi", "Greetings", "Hey").groupAdjacentBy(_.head).toList.map { case (k,vs) => k -> vs.toList }
* res0: List[(Char,List[String])] = List((H,List(Hello, Hi)), (G,List(Greetings)), (H,List(Hey)))
* }}}
*/
def groupBy[O2](f: O => O2)(implicit eq: Eq[O2]): Stream[F, (O2, Vector[O])] = {
def groupAdjacentBy[O2](f: O => O2)(implicit eq: Eq[O2]): Stream[F, (O2, Segment[O,Unit])] = {

def go(current: Option[(O2,Vector[O])], s: Stream[F,O]): Pull[F,(O2,Vector[O]),Unit] = {
def go(current: Option[(O2,Segment[O,Unit])], s: Stream[F,O]): Pull[F,(O2,Segment[O,Unit]),Unit] = {
s.pull.unconsChunk.flatMap {
case Some((hd,tl)) =>
val (k1, out) = current.getOrElse((f(hd(0)), Vector[O]()))
doChunk(hd, tl, k1, out, Vector.empty)
val (k1, out) = current.getOrElse((f(hd(0)), Segment.empty[O]))
doChunk(hd, tl, k1, out, None)
case None =>
val l = current.map { case (k1, out) => Pull.output1((k1, out)) } getOrElse Pull.pure(())
l >> Pull.done
}
}

@annotation.tailrec
def doChunk(chunk: Chunk[O], s: Stream[F,O], k1: O2, out: Vector[O], acc: Vector[(O2, Vector[O])]): Pull[F,(O2,Vector[O]),Unit] = {
def doChunk(chunk: Chunk[O], s: Stream[F,O], k1: O2, out: Segment[O,Unit], acc: Option[Segment[(O2, Segment[O,Unit]),Unit]]): Pull[F,(O2,Segment[O,Unit]),Unit] = {
val differsAt = chunk.indexWhere(v => eq.neqv(f(v), k1)).getOrElse(-1)
if (differsAt == -1) {
// whole chunk matches the current key, add this chunk to the accumulated output
val newOut: Vector[O] = out ++ chunk.toVector
if (acc.isEmpty) {
go(Some((k1, newOut)), s)
} else {
// potentially outputs one additional chunk (by splitting the last one in two)
Pull.output(Chunk.vector(acc)) >> go(Some((k1, newOut)), s)
val newOut: Segment[O,Unit] = out ++ chunk
acc match {
case None => go(Some((k1, newOut)), s)
case Some(acc) =>
// potentially outputs one additional chunk (by splitting the last one in two)
Pull.output(acc) >> go(Some((k1, newOut)), s)
}
} else {
// at least part of this chunk does not match the current key, need to group and retain chunkiness
// split the chunk into the bit where the keys match and the bit where they don't
val matching = chunk.take(differsAt)
val newOut: Vector[O] = out ++ matching.toVector
val newOut: Segment[O,Unit] = out ++ matching.voidResult
val nonMatching = chunk.drop(differsAt).fold(_ => Chunk.empty, identity).toChunk
// nonMatching is guaranteed to be non-empty here, because we know the last element of the chunk doesn't have
// the same key as the first
val k2 = f(nonMatching(0))
doChunk(nonMatching, s, k2, Vector[O](), acc :+ ((k1, newOut)))
doChunk(nonMatching, s, k2, Segment.empty[O], Some(acc.getOrElse(Segment.empty) ++ Segment((k1, newOut))))
}
}

Expand Down

0 comments on commit 8ba9365

Please sign in to comment.