Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CE polling system for non-blocking I/O on Native #3087

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
65e8dec
Bump to CE 3.5-4f9e57b
armanbilge Dec 17, 2022
073723b
Sketch `FdPollingSocket`
armanbilge Dec 17, 2022
ce77f22
Better error-handling in `ResizableBuffer`
armanbilge Dec 17, 2022
3bc1410
Impl socket close, add native util
armanbilge Dec 17, 2022
862ae58
Tidy unused type param
armanbilge Dec 17, 2022
9e475e8
Add socket address helpers
armanbilge Dec 17, 2022
f93c662
Implement `endOfInput`, `endOfOutput`
armanbilge Dec 17, 2022
06c0fe7
Wip socket reading
armanbilge Dec 17, 2022
a75a5ab
Take address as ctor args
armanbilge Dec 18, 2022
c104cd7
Implement reading
armanbilge Dec 18, 2022
c8e2316
Address warnings
armanbilge Dec 18, 2022
f0fd81d
Bikeshed
armanbilge Dec 18, 2022
86ceb06
Implement writing
armanbilge Dec 18, 2022
3eb0e18
Bump CE snapshot, adopt new fd polling api
armanbilge Dec 21, 2022
8889e05
First attempt at unix sockets
armanbilge Dec 22, 2022
5c62bc2
Cross-compile unixsockets tests
armanbilge Dec 22, 2022
adba432
Workaround another borked method
armanbilge Dec 22, 2022
0aa9260
Bump base version
armanbilge Dec 22, 2022
64d7a76
Implement non-blocking std{in,out,err}
armanbilge Dec 22, 2022
c27e03d
Fix unix socket error handling
armanbilge Dec 22, 2022
ebc0ca5
Simplify unix socket connect
armanbilge Dec 22, 2022
43b0c06
Fix simplified unix socket connect
armanbilge Dec 22, 2022
6e25eab
stackalloc `sockaddr_un` ftw
armanbilge Dec 22, 2022
1cf0db4
Add more socket option helpers
armanbilge Dec 22, 2022
74b80f9
`accept4` is a linux thing
armanbilge Dec 22, 2022
84fc9b1
First attempt at fd polling socket group
armanbilge Dec 22, 2022
1678d52
`raiseSocketError` -> `checkSocketError`
armanbilge Dec 22, 2022
b5b3a04
Expose new polling system `Network`
armanbilge Dec 22, 2022
e682a37
Fix exceptions, tweak test
armanbilge Dec 22, 2022
220f6bc
Add forgotten `guard`s
armanbilge Dec 22, 2022
ef299db
Workaround BSD `sa_family` quirk
armanbilge Dec 22, 2022
f6ecd2b
Unused import
armanbilge Dec 22, 2022
830564a
Fixing+debugging
armanbilge Dec 22, 2022
0ece5fc
Revert "Workaround BSD `sa_family` quirk"
armanbilge Dec 22, 2022
26a4e4f
Explicitly track if ipv4/ipv6 socket
armanbilge Dec 23, 2022
b0f71fe
Fix Scala 3 compile
armanbilge Dec 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._

Global / onChangedBuildSource := ReloadOnSourceChanges

ThisBuild / tlBaseVersion := "3.4"
ThisBuild / tlBaseVersion := "3.5"

ThisBuild / organization := "co.fs2"
ThisBuild / organizationName := "Functional Streams for Scala"
Expand Down Expand Up @@ -209,9 +209,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % "2.9.0",
"org.typelevel" %%% "cats-laws" % "2.9.0" % Test,
"org.typelevel" %%% "cats-effect" % "3.4.2",
"org.typelevel" %%% "cats-effect-laws" % "3.4.2" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.4.2" % Test,
"org.typelevel" %%% "cats-effect" % "3.5-9ba870f",
"org.typelevel" %%% "cats-effect-laws" % "3.5-9ba870f" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.5-9ba870f" % Test,
"org.scodec" %%% "scodec-bits" % "1.1.34",
"org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test,
Expand Down
30 changes: 0 additions & 30 deletions io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,16 @@
package fs2
package io

import cats._
import cats.effect.kernel.Sync
import cats.effect.kernel.implicits._
import cats.syntax.all._

import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag

private[fs2] trait iojvmnative {
type InterruptedIOException = java.io.InterruptedIOException
type ClosedChannelException = java.nio.channels.ClosedChannelException

//
// STDIN/STDOUT Helpers

/** Stream of bytes read asynchronously from standard input. */
def stdin[F[_]: Sync](bufSize: Int): Stream[F, Byte] =
readInputStream(Sync[F].blocking(System.in), bufSize, false)

/** Pipe of bytes that writes emitted values to standard output asynchronously. */
def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] =
writeOutputStream(Sync[F].blocking(System.out), false)

/** Pipe of bytes that writes emitted values to standard error asynchronously. */
def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] =
writeOutputStream(Sync[F].blocking(System.err), false)

/** Writes this stream to standard output asynchronously, converting each element to
* a sequence of bytes via `Show` and the given `Charset`.
*/
def stdoutLines[F[_]: Sync, O: Show](
charset: Charset = StandardCharsets.UTF_8
): Pipe[F, O, Nothing] =
_.map(_.show).through(text.encode(charset)).through(stdout)

/** Stream of `String` read asynchronously from standard input decoded in UTF-8. */
def stdinUtf8[F[_]: Sync](bufSize: Int): Stream[F, String] =
stdin(bufSize).through(text.utf8.decode)

/** Stream of bytes read asynchronously from the specified resource relative to the class `C`.
* @see [[readClassLoaderResource]] for a resource relative to a classloader.
*/
Expand Down
30 changes: 30 additions & 0 deletions io/jvm/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,46 @@
package fs2
package io

import cats.Show
import cats.effect.kernel.{Async, Outcome, Resource, Sync}
import cats.effect.kernel.implicits._
import cats.effect.kernel.Deferred
import cats.syntax.all._
import fs2.io.internal.PipedStreamBuffer

import java.io.{InputStream, OutputStream}
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets

private[fs2] trait ioplatform extends iojvmnative {

//
// STDIN/STDOUT Helpers

/** Stream of bytes read asynchronously from standard input. */
def stdin[F[_]: Sync](bufSize: Int): Stream[F, Byte] =
readInputStream(Sync[F].blocking(System.in), bufSize, false)

/** Pipe of bytes that writes emitted values to standard output asynchronously. */
def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] =
writeOutputStream(Sync[F].blocking(System.out), false)

/** Pipe of bytes that writes emitted values to standard error asynchronously. */
def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] =
writeOutputStream(Sync[F].blocking(System.err), false)

/** Writes this stream to standard output asynchronously, converting each element to
* a sequence of bytes via `Show` and the given `Charset`.
*/
def stdoutLines[F[_]: Sync, O: Show](
charset: Charset = StandardCharsets.UTF_8
): Pipe[F, O, Nothing] =
_.map(_.show).through(text.encode(charset)).through(stdout)

/** Stream of `String` read asynchronously from standard input decoded in UTF-8. */
def stdinUtf8[F[_]: Sync](bufSize: Int): Stream[F, String] =
stdin(bufSize).through(text.utf8.decode)

/** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`,
* that is closed whenever the resulting stream terminates.
*
Expand Down
79 changes: 79 additions & 0 deletions io/native/src/main/scala/fs2/io/internal/NativeUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2.io.internal

import cats.effect.Sync

import java.io.IOException
import java.net.BindException
import java.net.ConnectException
import scala.scalanative.annotation.alwaysinline
import scala.scalanative.libc.errno._
import scala.scalanative.posix.fcntl._
import scala.scalanative.posix.errno._
import scala.scalanative.posix.string._
import scala.scalanative.unsafe._

private[io] object NativeUtil {

@alwaysinline def guard_(thunk: => CInt): Unit = {
guard(thunk)
()
}

@alwaysinline def guard(thunk: => CInt): CInt = {
val rtn = thunk
if (rtn < 0) {
val e = errno
if (e == EAGAIN || e == EWOULDBLOCK)
rtn
else throw errnoToThrowable(e)
} else
rtn
}

@alwaysinline def guardSSize(thunk: => CSSize): CSSize = {
val rtn = thunk
if (rtn < 0) {
val e = errno
if (e == EAGAIN || e == EWOULDBLOCK)
rtn
else throw errnoToThrowable(e)
} else
rtn
}

@alwaysinline def errnoToThrowable(e: CInt): Throwable = {
val msg = fromCString(strerror(e))
if (e == EADDRINUSE /* || e == EADDRNOTAVAIL */ )
new BindException(msg)
else if (e == ECONNREFUSED)
new ConnectException(msg)
else
new IOException(msg)
}

def setNonBlocking[F[_]](fd: CInt)(implicit F: Sync[F]): F[Unit] = F.delay {
guard_(fcntl(fd, F_SETFL, O_NONBLOCK))
}

}
44 changes: 26 additions & 18 deletions io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,52 @@

package fs2.io.internal

import cats.effect.kernel.Async
import cats.effect.kernel.Resource
import cats.effect.kernel.Sync
import cats.effect.std.Semaphore
import cats.syntax.all._

import scala.scalanative.libc.errno._
import scala.scalanative.libc.stdlib._
import scala.scalanative.posix.string._
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._

private[io] final class ResizableBuffer[F[_]] private (
semaphore: Semaphore[F],
private var ptr: Ptr[Byte],
private[this] var size: Int
)(implicit F: Sync[F]) {
)(implicit F: Async[F]) {

def get(size: Int): F[Ptr[Byte]] = F.delay {
if (size <= this.size)
F.pure(ptr)
else {
ptr = realloc(ptr, size.toUInt)
this.size = size
if (ptr == null)
F.raiseError[Ptr[Byte]](new RuntimeException(s"realloc: ${errno}"))
else F.pure(ptr)
def get(size: Int): Resource[F, Ptr[Byte]] = semaphore.permit.evalMap { _ =>
F.delay {
if (size <= this.size)
ptr
else {
ptr = realloc(ptr, size.toUInt)
this.size = size
if (ptr == null)
throw new RuntimeException(fromCString(strerror(errno)))
else ptr
}
}
}.flatten
}

}

private[io] object ResizableBuffer {

def apply[F[_]](size: Int)(implicit F: Sync[F]): Resource[F, ResizableBuffer[F]] =
def apply[F[_]](size: Int)(implicit F: Async[F]): Resource[F, ResizableBuffer[F]] =
Resource.make {
F.delay {
val ptr = malloc(size.toUInt)
if (ptr == null)
throw new RuntimeException(s"malloc: ${errno}")
else new ResizableBuffer(ptr, size)
Semaphore[F](1).flatMap { semaphore =>
F.delay {
val ptr = malloc(size.toUInt)
if (ptr == null)
throw new RuntimeException(fromCString(strerror(errno)))
else new ResizableBuffer(semaphore, ptr, size)
}
}

}(buf => F.delay(free(buf.ptr)))

}
Loading