Skip to content

Commit

Permalink
Remove asView source transformation methods (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Feb 7, 2025
1 parent 3f6b304 commit b7bdd32
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 242 deletions.
3 changes: 1 addition & 2 deletions core/src/main/scala/ox/channels/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ trait Source[+T] extends SourceOps[T] with SourceDrainOps[T]:
* for receive, sending values is also not possible, [[isClosedForSend]] will return `true`.
* @return
* `false`, if more values **might** be received from the channel, when calling [[Source.receive()]]. However, it's not guaranteed that
* some values will be available. They might be received concurrently, or filtered out if the channel is created using
* [[Source.mapAsView()]], [[Source.filterAsView()]] or [[Source.collectAsView()]].
* some values will be available - they might be received concurrently.
*/
def isClosedForReceive: Boolean = delegate.isClosedForReceive

Expand Down
62 changes: 1 addition & 61 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
@@ -1,67 +1,11 @@
package ox.channels

import com.softwaremill.jox.Source as JSource
import ox.*

import java.util

trait SourceOps[+T]:
outer: Source[T] =>
// view ops (lazy)

/** Lazily-evaluated map: creates a view of this source, where the results of [[receive]] will be transformed on the consumer's thread
* using the given function `f`. For an eager, asynchronous version, see [[map]].
*
* The same logic applies to receive clauses created using this source, which can be used in [[select]].
*
* @param f
* The mapping function. Results should not be `null`.
* @return
* A source which is a view of this source, with the mapping function applied.
*/
def mapAsView[U](f: T => U): Source[U] = new Source[U]:
override val delegate: JSource[Any] = outer.delegate.asInstanceOf[JSource[T]].collectAsView(t => f(t))

/** Lazily-evaluated tap: creates a view of this source, where the results of [[receive]] will be applied to the given function `f` on the
* consumer's thread. Useful for side-effects without result values, like logging and debugging. For an eager, asynchronous version, see
* [[tap]].
*
* The same logic applies to receive clauses created using this source, which can be used in [[select]].
*
* @param f
* The consumer function.
* @return
* A source which is a view of this source, with the consumer function applied.
*/
def tapAsView(f: T => Unit): Source[T] = mapAsView(t =>
f(t); t
)

/** Lazily-evaluated filter: Creates a view of this source, where the results of [[receive]] will be filtered on the consumer's thread
* using the given predicate `p`. For an eager, asynchronous version, see [[filter]].
*
* The same logic applies to receive clauses created using this source, which can be used in [[select]].
*
* @param f
* The predicate to use for filtering.
* @return
* A source which is a view of this source, with the filtering function applied.
*/
def filterAsView(f: T => Boolean): Source[T] = new Source[T]:
override val delegate: JSource[Any] = outer.delegate.filterAsView(t => f(t.asInstanceOf[T]))

/** Creates a view of this source, where the results of [[receive]] will be transformed on the consumer's thread using the given function
* `f`. If the function is not defined at a value, the value will be skipped. For an eager, asynchronous version, see [[collect]].
*
* The same logic applies to receive clauses created using this source, which can be used in [[select]].
*
* @param f
* The collecting function. Results should not be `null`.
* @return
* A source which is a view of this source, with the collecting function applied.
*/
def collectAsView[U](f: PartialFunction[T, U]): Source[U] = new Source[U]:
override val delegate: JSource[Any] = outer.delegate.collectAsView(t => f.applyOrElse(t.asInstanceOf[T], _ => null))

// run ops (eager)

Expand All @@ -82,8 +26,6 @@ trait SourceOps[+T]:
* Must be run within a scope, as a child fork is created, which receives from this source and sends the mapped values to the resulting
* one.
*
* For a lazily-evaluated version, see [[mapAsView]].
*
* @param f
* The mapping function.
* @return
Expand All @@ -110,7 +52,7 @@ trait SourceOps[+T]:
* Must be run within a scope, as a child fork is created, which receives from this source and sends the mapped values to the resulting
* one.
*
* Useful for side-effects without result values, like logging and debugging. For a lazily-evaluated version, see [[tapAsView]].
* Useful for side-effects without result values, like logging and debugging.
*
* @param f
* The consumer function.
Expand All @@ -130,8 +72,6 @@ trait SourceOps[+T]:
* Must be run within a scope, as a child fork is created, which receives from this source and sends the mapped values to the resulting
* one.
*
* For a lazily-evaluated version, see [[collectAsView]].
*
* @param f
* The mapping function.
* @return
Expand Down
158 changes: 0 additions & 158 deletions core/src/test/scala/ox/channels/SourceOpsAsViewTest.scala

This file was deleted.

21 changes: 0 additions & 21 deletions doc/streaming/transforming-channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,6 @@ these channels by default is 16 (buffered). This can be overridden by providing
(v: Source[Int]).map(_ + 1)(using BufferCapacity(10))
```

## Transforming lazily

A limited number of transformations can be applied to a source without creating a new channel and a new fork, which
computes the transformation. These include: `.mapAsView`, `.filterAsView` and `.collectAsView`.

For example:

```scala mdoc:compile-only
import ox.channels.{Channel, Source}

val c = Channel.rendezvous[String]
val c2: Source[Int] = c.mapAsView(s => s.length())
```

The mapping function (`s => s.length()`) will only be invoked when the source is consumed (using `.receive()` or
`select`), on the consumer's thread. This is in contrast to `.map`, where the mapping function is invoked on a separate
fork.

Hence, creating views doesn't need to be run within a scope, and creating the view itself doesn't consume any elements
from the source on which it is run.

## Discharging channels

Values of a source can be discharged using methods such as `.foreach`, `.toList`, `.pipeTo` or `.drain`:
Expand Down

0 comments on commit b7bdd32

Please sign in to comment.