Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Migrate to ZIO 2.0 (#460)
Browse files Browse the repository at this point in the history
* Migration to ZIO 2.0 - run the scalafix and do obvious manual fixes

* Migrate transducers to ZIO 2.0

* Chip away more errors

* Finish code migration to ZIO 2.0, migrate tests

* Final tweaks

* Add disableAutoTrace all over the place

* Bump zio snapshot, remove TODO

* Bump zio, improve example, add tracing to ManagedOps#useForked

* initial work

* fix version specific issue

* format

* update documentation

* upgrade scala 3 version

* blocking

* format

* fix conflicting versions

Co-authored-by: Zdeněk Hřebíček <10925068+zhrebicek@users.noreply.github.com>
Co-authored-by: Zdeněk Hřebíček <zdenek.hrebicek@gmail.com>
  • Loading branch information
3 people authored Jan 20, 2022
1 parent a5452df commit 84aa783
Show file tree
Hide file tree
Showing 66 changed files with 1,537 additions and 1,183 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
fail-fast: false
matrix:
java: ['adopt@1.8', 'adopt@1.11']
scala: ['2.11.12', '2.12.15', '2.13.6', '3.0.2']
scala: ['2.11.12', '2.12.15', '2.13.6', '3.1.0']
steps:
- name: Checkout current branch
uses: actions/checkout@v2.4.0
Expand Down
36 changes: 32 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,37 @@
project/zecret
project/travis-deploy-key
project/secrets.tar.xz
target
test-output/
.sbtopts
.bsp
project/.sbt
*.tmp
website/i18n
website/yarn.lock
website/static/api
test-output/
.bloop
.metals
metals.sbt
*/metals.sbt
.idea
coursier
.DS_Store
metals.sbt
project/metals.sbt
project/project/metals.sbt
sbt.json
.bsp/
project/project/
*.iml

# if you are here to add your IDE's files please read this instead:
# https://stackoverflow.com/questions/7335420/global-git-ignore#22885996
website/node_modules
website/.docusaurus
website/build
website/docs
website/static/api*
website/versioned_docs
website/i18n/en.json
website/yarn.lock
website/package-lock.json
website/static/api
.bsp/
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"files.watcherExclude": {
"**/target": true
}
}
24 changes: 16 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,30 @@ addCommandAlias(
";zioNio/test;examples/test"
)

val zioVersion = "1.0.12"
val zioVersion = "2.0.0-RC1"

lazy val zioNio = project
.in(file("nio"))
.settings(stdSettings("zio-nio"))
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.5.0",
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
("org.scala-lang.modules" %% "scala-collection-compat" % "2.6.0").cross(CrossVersion.for3Use2_13),
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test
),
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
.settings(dottySettings)
.settings(scala3Settings)
.settings(
scalacOptions ++= {
if (scalaVersion.value == Scala3)
Seq.empty
else
Seq("-P:silencer:globalFilters=[zio.stacktracer.TracingImplicits.disableAutoTrace]")
}
)

lazy val docs = project
.in(file("zio-nio-docs"))
Expand All @@ -59,5 +67,5 @@ lazy val examples = project
publish / skip := true,
moduleName := "examples"
)
.settings(dottySettings)
.settings(scala3Settings)
.dependsOn(zioNio)
13 changes: 6 additions & 7 deletions docs/essentials/charsets.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,23 @@ import zio.nio.channels.FileChannel
import zio.nio.channels._
import zio.nio.file.Path
import zio.stream.ZStream
import zio.blocking.Blocking
import zio.console
import zio.Console
import zio.ZIO

// dump a file encoded in ISO8859 to the console

FileChannel.open(Path("iso8859.txt")).useNioBlockingOps { fileOps =>
val inStream: ZStream[Blocking, Exception, Byte] = ZStream.repeatEffectChunkOption {
val inStream: ZStream[Any, Exception, Byte] = ZStream.repeatZIOChunkOption {
fileOps.readChunk(1000).asSomeError.flatMap { chunk =>
if (chunk.isEmpty) ZIO.fail(None) else ZIO.succeed(chunk)
}
}

// apply decoding transducer
val charStream: ZStream[Blocking, Exception, Char] =
inStream.transduce(Charset.Standard.iso8859_1.newDecoder.transducer())
val charStream: ZStream[Any, Exception, Char] =
inStream.via(Charset.Standard.iso8859_1.newDecoder.transducer())

console.putStrLn("ISO8859 file contents:") *>
charStream.foreachChunk(chars => console.putStr(chars.mkString))
Console.printLine("ISO8859 file contents:") *>
charStream.runForeachChunk(chars => Console.printLine(chars.mkString))
}
```
12 changes: 6 additions & 6 deletions docs/essentials/files.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Required imports for presented snippets:
import zio._
import zio.nio.channels._
import zio.nio.file._
import zio.console._
import zio.Console._
```

## Basic operations
Expand All @@ -38,7 +38,7 @@ val readWriteOp = (channel: AsynchronousFileChannel) =>
for {
chunk <- channel.readChunk(20, 0L)
text = chunk.map(_.toChar).mkString
_ <- putStrLn(text)
_ <- printLine(text)

input = Chunk.fromArray("message".toArray.map(_.toByte))
_ <- channel.writeChunk(input, 0L)
Expand All @@ -51,12 +51,12 @@ they are not in effects. Apart from basic acquire/release actions, the core API
```scala mdoc:silent
val lockOp = (channel: AsynchronousFileChannel) =>
for {
isShared <- channel.lock().bracket(_.release.ignore)(l => IO.succeed(l.isShared))
_ <- putStrLn(isShared.toString) // false
isShared <- channel.lock().acquireReleaseWith(_.release.ignore)(l => IO.succeed(l.isShared))
_ <- printLine(isShared.toString) // false

managed = Managed.make(channel.lock(position = 0, size = 10, shared = false))(_.release.ignore)
managed = Managed.acquireReleaseWith(channel.lock(position = 0, size = 10, shared = false))(_.release.ignore)
isOverlaping <- managed.use(l => IO.succeed(l.overlaps(5, 20)))
_ <- putStrLn(isOverlaping.toString) // true
_ <- printLine(isOverlaping.toString) // true
} yield ()
```

Expand Down
3 changes: 1 addition & 2 deletions docs/essentials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ When reading from channels, the end of the stream may be reached at any time. Th

```scala mdoc:silent
import zio._
import zio.blocking.Blocking
import zio.nio._
import zio.nio.channels._
import zio.nio.file.Path
import java.io.IOException

val read100: ZIO[Blocking, Option[IOException], Chunk[Byte]] =
val read100: ZIO[Any, Option[IOException], Chunk[Byte]] =
FileChannel.open(Path("foo.txt"))
.useNioBlockingOps(_.readChunk(100))
.eofCheck
Expand Down
5 changes: 2 additions & 3 deletions docs/essentials/resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ The most straight-forward way to use a managed resource is with the `use` method

```scala mdoc:silent
import zio._
import zio.blocking.Blocking
import zio.nio.channels._
import zio.nio.file.Path
import java.io.IOException

def useChannel(f: FileChannel): ZIO[Blocking, IOException, Unit] = ???
def useChannel(f: FileChannel): ZIO[Any, IOException, Unit] = ???

val effect: ZIO[Blocking, IOException, Unit] = FileChannel.open(Path("foo.txt"))
val effect: ZIO[Any, IOException, Unit] = FileChannel.open(Path("foo.txt"))
.use { fileChannel =>
// fileChannel is only valid in this lexical scope
useChannel(fileChannel)
Expand Down
14 changes: 7 additions & 7 deletions docs/essentials/sockets.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ Required imports for snippets:

```scala mdoc:silent
import zio._
import zio.clock._
import zio.console._
import zio.Clock._
import zio.Console._
import zio.nio.channels._
import zio.nio._
```
Expand All @@ -21,11 +21,11 @@ Creating a server socket:

```scala mdoc:silent
val server = AsynchronousServerSocketChannel.open
.mapM { socket =>
.mapZIO { socket =>
for {
address <- InetSocketAddress.hostName("127.0.0.1", 1337)
_ <- socket.bindTo(address)
_ <- socket.accept.preallocate.flatMap(_.use(channel => doWork(channel).catchAll(ex => putStrLn(ex.getMessage))).fork).forever.fork
_ <- socket.accept.preallocate.flatMap(_.use(channel => doWork(channel).catchAll(ex => printLine(ex.getMessage))).fork).forever.fork
} yield ()
}.useForever

Expand All @@ -34,18 +34,18 @@ def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwabl
for {
chunk <- channel.readChunk(3)
str = chunk.toArray.map(_.toChar).mkString
_ <- putStrLn(s"received: [$str] [${chunk.length}]")
_ <- printLine(s"received: [$str] [${chunk.length}]")
} yield ()

process.whenM(channel.isOpen).forever
process.whenZIO(channel.isOpen).forever
}
```

Creating a client socket:

```scala mdoc:silent
val clientM: Managed[Exception, AsynchronousSocketChannel] = AsynchronousSocketChannel.open
.mapM { client =>
.mapZIO { client =>
for {
host <- InetAddress.localHost
address <- InetSocketAddress.inetAddress(host, 2552)
Expand Down
22 changes: 13 additions & 9 deletions examples/src/main/scala/StreamDirWatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nio
package examples

import zio.nio.file.{Path, WatchService}
import zio.{Console, ZIOAppDefault}

import java.nio.file.{StandardWatchEventKinds, WatchEvent}

Expand All @@ -12,7 +13,7 @@ import java.nio.file.{StandardWatchEventKinds, WatchEvent}
* Note that on macOS the standard Java `WatchService` uses polling and so is a bit slow, and only registers at most one
* type of event for each directory member since the last poll.
*/
object StreamDirWatch extends App {
object StreamDirWatch extends ZIOAppDefault {

private def watch(dir: Path) =
WatchService.forDefaultFileSystem.use { service =>
Expand All @@ -26,8 +27,8 @@ object StreamDirWatch extends App {
),
maxDepth = 100
)
_ <- console.putStrLn(s"Watching directory '$dir'")
_ <- console.putStrLn("")
_ <- Console.printLine(s"Watching directory '$dir'")
_ <- Console.printLine("")
_ <- service.stream.foreach { key =>
val eventProcess = { (event: WatchEvent[_]) =>
val desc = event.kind() match {
Expand All @@ -38,16 +39,19 @@ object StreamDirWatch extends App {
case other => s"Unknown: $other"
}
val path = key.resolveEventPath(event).getOrElse("** PATH UNKNOWN **")
console.putStrLn(s"$desc, count: ${event.count()}, $path")
Console.printLine(s"$desc, count: ${event.count()}, $path")
}
key.pollEventsManaged.use(ZIO.foreach_(_)(eventProcess))
key.pollEventsManaged.use(ZIO.foreachDiscard(_)(eventProcess))
}
} yield ()
}

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
args.headOption
.map(dirString => watch(Path(dirString)).exitCode)
.getOrElse(console.putStrLn("A directory argument is required").exitCode)
override def run: URIO[zio.ZEnv with ZIOAppArgs, ExitCode] =
ZIO
.serviceWith[ZIOAppArgs](_.getArgs.toList.headOption)
.flatMap(
_.map(dirString => watch(Path(dirString)).exitCode)
.getOrElse(Console.printLine("A directory argument is required").exitCode)
)

}
39 changes: 18 additions & 21 deletions examples/src/main/scala/StreamsBasedServer.scala
Original file line number Diff line number Diff line change
@@ -1,55 +1,52 @@
package zio.nio.examples

import zio._
import zio.clock.Clock
import zio.duration._
import zio.nio.InetSocketAddress
import zio.nio.channels.AsynchronousServerSocketChannel
import zio.stream._
import zio.{Clock, Console, ExitCode, Managed, RIO, URIO, ZIO, ZIOAppDefault, ZTraceElement, durationInt}

object StreamsBasedServer extends App {
object StreamsBasedServer extends ZIOAppDefault {

def run(args: List[String]): URIO[zio.console.Console with Clock with zio.console.Console, ExitCode] =
def run: URIO[Console with Clock with Console, ExitCode] =
ZStream
.managed(server(8080))
.flatMap(handleConnections(_) { chunk =>
console.putStrLn(s"Read data: ${chunk.mkString}") *>
clock.sleep(2.seconds) *>
console.putStrLn("Done").ignore
Console.printLine(s"Read data: ${chunk.mkString}") *>
Clock.sleep(2.seconds) *>
Console.printLine("Done").ignore
})
.runDrain
.orDie
.exitCode

def server(port: Int): Managed[Exception, AsynchronousServerSocketChannel] =
def server(port: Int)(implicit trace: ZTraceElement): Managed[Exception, AsynchronousServerSocketChannel] =
for {
server <- AsynchronousServerSocketChannel.open
socketAddress <- InetSocketAddress.wildCard(port).toManaged_
_ <- server.bindTo(socketAddress).toManaged_
socketAddress <- InetSocketAddress.wildCard(port).toManaged
_ <- server.bindTo(socketAddress).toManaged
} yield server

def handleConnections[R <: console.Console](
def handleConnections[R <: Console](
server: AsynchronousServerSocketChannel
)(f: String => RIO[R, Unit]): ZStream[R, Throwable, Unit] =
)(f: String => RIO[R, Unit])(implicit trace: ZTraceElement): ZStream[R, Throwable, Unit] =
ZStream
.repeatEffect(server.accept.preallocate)
.map(conn => ZStream.managed(conn.ensuring(console.putStrLn("Connection closed").ignore).withEarlyRelease))
.repeatZIO(server.accept.preallocate)
.map(conn => ZStream.managed(conn.ensuring(Console.printLine("Connection closed").ignore).withEarlyRelease))
.flatMapPar[R, Throwable, Unit](16) { connection =>
connection.mapM { case (closeConn, channel) =>
connection.mapZIO { case (closeConn, channel) =>
for {
_ <- console.putStrLn("Received connection")
_ <- Console.printLine("Received connection")
data <- ZStream
.fromEffectOption(
channel.readChunk(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None))
.fromZIOOption(
channel.readChunk(64).tap(_ => Console.printLine("Read chunk")).orElse(ZIO.fail(None))
)
.flattenChunks
.take(4)
.transduce(ZTransducer.utf8Decode)
.via(ZPipeline.utf8Decode)
.run(Sink.foldLeft("")(_ + (_: String)))
_ <- closeConn
_ <- f(data)
} yield ()
}
}

}
Loading

0 comments on commit 84aa783

Please sign in to comment.