From 65e8dec280efa4554d93fb15b0ce9ccf0c0bb6a7 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 17 Dec 2022 20:46:42 +0000 Subject: [PATCH 01/36] Bump to CE 3.5-4f9e57b --- build.sbt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 3599a5ecae..b8d7c616ac 100644 --- a/build.sbt +++ b/build.sbt @@ -209,9 +209,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % "2.9.0", "org.typelevel" %%% "cats-laws" % "2.9.0" % Test, - "org.typelevel" %%% "cats-effect" % "3.4.2", - "org.typelevel" %%% "cats-effect-laws" % "3.4.2" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.4.2" % Test, + "org.typelevel" %%% "cats-effect" % "3.5-4f9e57b", + "org.typelevel" %%% "cats-effect-laws" % "3.5-4f9e57b" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.5-4f9e57b" % Test, "org.scodec" %%% "scodec-bits" % "1.1.34", "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test, From 073723bb0d5d6271f6b351c1ad9a0ab56f559780 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 17 Dec 2022 21:10:19 +0000 Subject: [PATCH 02/36] Sketch `FdPollingSocket` --- .../scala/fs2/io/net/FdPollingSocket.scala | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala new file mode 100644 index 0000000000..32480881c5 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io.net + +import cats.effect.std.Mutex +import cats.effect.unsafe.FileDescriptorPoller +import com.comcast.ip4s.IpAddress +import com.comcast.ip4s.SocketAddress +import fs2.io.internal.ResizableBuffer + +import java.util.concurrent.atomic.AtomicReference + +import FdPollingSocket._ + +private final class FdPollingSocket[F[_]]( + fd: Int, + readBuffer: ResizableBuffer[F], + readMutex: Mutex[F], + writeMutex: Mutex[F] +) extends Socket[F] + with FileDescriptorPoller.Callback { + + def isOpen: F[Boolean] = ??? + + def localAddress: F[SocketAddress[IpAddress]] = ??? + def remoteAddress: F[SocketAddress[IpAddress]] = ??? + + def endOfInput: F[Unit] = ??? + def endOfOutput: F[Unit] = ??? + + private[this] val readCallback = new AtomicReference[Either[Throwable, Unit] => Unit] + private[this] val writeCallback = new AtomicReference[Either[Throwable, Unit] => Unit] + + def notifyFileDescriptorEvents(readReady: Boolean, writeReady: Boolean): Unit = ??? + + def read(maxBytes: Int): F[Option[Chunk[Byte]]] = ??? + def readN(numBytes: Int): F[Chunk[Byte]] = ??? + def reads: Stream[F, Byte] = ??? + + def write(bytes: Chunk[Byte]): F[Unit] = ??? + def writes: Pipe[F, Byte, Nothing] = ??? + +} + +private object FdPollingSocket { + + private val ReadySentinel: Either[Throwable, Unit] => Unit = _ => () + +} From ce77f22421642bf0dfa3752587d7a3c157696940 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 17 Dec 2022 21:40:56 +0000 Subject: [PATCH 03/36] Better error-handling in `ResizableBuffer` --- .../main/scala/fs2/io/internal/ResizableBuffer.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala b/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala index 51f4808cf6..1ec4310bd0 100644 --- a/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala +++ b/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala @@ -23,10 +23,10 @@ package fs2.io.internal import cats.effect.kernel.Resource import cats.effect.kernel.Sync -import cats.syntax.all._ import scala.scalanative.libc.errno._ import scala.scalanative.libc.stdlib._ +import scala.scalanative.posix.string._ import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ @@ -37,15 +37,15 @@ private[io] final class ResizableBuffer[F[_]] private ( def get(size: Int): F[Ptr[Byte]] = F.delay { if (size <= this.size) - F.pure(ptr) + ptr else { ptr = realloc(ptr, size.toUInt) this.size = size if (ptr == null) - F.raiseError[Ptr[Byte]](new RuntimeException(s"realloc: ${errno}")) - else F.pure(ptr) + throw new RuntimeException(fromCString(strerror(errno))) + else ptr } - }.flatten + } } @@ -56,7 +56,7 @@ private[io] object ResizableBuffer { F.delay { val ptr = malloc(size.toUInt) if (ptr == null) - throw new RuntimeException(s"malloc: ${errno}") + throw new RuntimeException(fromCString(strerror(errno))) else new ResizableBuffer(ptr, size) } }(buf => F.delay(free(buf.ptr))) From 3bc141099b0030cbc7bb10029a10636d34b6ce01 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 17 Dec 2022 21:57:01 +0000 Subject: [PATCH 04/36] Impl socket close, add native util --- .../scala/fs2/io/internal/NativeUtil.scala | 45 +++++++++++++++++++ .../scala/fs2/io/net/FdPollingSocket.scala | 17 +++++-- 2 files changed, 59 insertions(+), 3 deletions(-) create mode 100644 io/native/src/main/scala/fs2/io/internal/NativeUtil.scala diff --git a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala new file mode 100644 index 0000000000..8b3062d5ec --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import scala.scalanative.annotation.alwaysinline +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.unsafe._ +import java.io.IOException + +private[io] object NativeUtil { + + @alwaysinline def guard_(thunk: => CInt): Unit = { + guard(thunk) + () + } + + @alwaysinline def guard(thunk: => CInt): CInt = { + val rtn = thunk + if (rtn < 0) + throw new IOException(fromCString(strerror(errno))) + else + rtn + } + +} diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 32480881c5..6b25d18a77 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -22,12 +22,15 @@ package fs2 package io.net +import cats.effect.kernel.Async import cats.effect.std.Mutex import cats.effect.unsafe.FileDescriptorPoller import com.comcast.ip4s.IpAddress import com.comcast.ip4s.SocketAddress +import fs2.io.internal.NativeUtil._ import fs2.io.internal.ResizableBuffer +import scala.scalanative.posix.unistd import java.util.concurrent.atomic.AtomicReference import FdPollingSocket._ @@ -37,10 +40,18 @@ private final class FdPollingSocket[F[_]]( readBuffer: ResizableBuffer[F], readMutex: Mutex[F], writeMutex: Mutex[F] -) extends Socket[F] +)(implicit F: Async[F]) + extends Socket[F] with FileDescriptorPoller.Callback { - def isOpen: F[Boolean] = ??? + @volatile private[this] var open = true + + def isOpen: F[Boolean] = F.delay(open) + + def close: F[Unit] = F.delay { + open = false + guard_(unistd.close(fd)) + } def localAddress: F[SocketAddress[IpAddress]] = ??? def remoteAddress: F[SocketAddress[IpAddress]] = ??? @@ -49,7 +60,7 @@ private final class FdPollingSocket[F[_]]( def endOfOutput: F[Unit] = ??? private[this] val readCallback = new AtomicReference[Either[Throwable, Unit] => Unit] - private[this] val writeCallback = new AtomicReference[Either[Throwable, Unit] => Unit] + private[this] val writeCallback = new AtomicReference[Either[Throwable, Unit] => Unit] def notifyFileDescriptorEvents(readReady: Boolean, writeReady: Boolean): Unit = ??? From 862ae581e0a5828f4c60d4be89ee9b9f0ebc1c43 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 17 Dec 2022 21:57:23 +0000 Subject: [PATCH 05/36] Tidy unused type param --- io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala b/io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala index 66590f5d3b..c52eeca612 100644 --- a/io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala +++ b/io/native/src/main/scala/fs2/io/net/tls/s2nutil.scala @@ -44,7 +44,7 @@ private[tls] object s2nutil { throw new S2nException(error) } - @alwaysinline def guard[A](thunk: => CInt): CInt = { + @alwaysinline def guard(thunk: => CInt): CInt = { val rtn = thunk if (rtn < 0) { val error = !s2n_errno_location() From 9e475e84239d7acf2f5971c9ca84f410b42f4644 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 17 Dec 2022 22:37:38 +0000 Subject: [PATCH 06/36] Add socket address helpers Co-authored-by: Lee Tibbert --- .../io/internal/SocketAddressHelpers.scala | 193 ++++++++++++++++++ .../main/scala/fs2/io/internal/netinet.scala | 90 ++++++++ .../scala/fs2/io/net/FdPollingSocket.scala | 6 +- 3 files changed, 287 insertions(+), 2 deletions(-) create mode 100644 io/native/src/main/scala/fs2/io/internal/SocketAddressHelpers.scala create mode 100644 io/native/src/main/scala/fs2/io/internal/netinet.scala diff --git a/io/native/src/main/scala/fs2/io/internal/SocketAddressHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketAddressHelpers.scala new file mode 100644 index 0000000000..a0a3ad2e76 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/SocketAddressHelpers.scala @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import cats.effect.kernel.Resource +import cats.effect.kernel.Sync +import cats.syntax.all._ +import com.comcast.ip4s.IpAddress +import com.comcast.ip4s.Ipv4Address +import com.comcast.ip4s.Ipv6Address +import com.comcast.ip4s.Port +import com.comcast.ip4s.SocketAddress + +import java.io.IOException +import scala.scalanative.posix.arpa.inet._ +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.posix.sys.socketOps._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import NativeUtil._ +import netinetin._ +import netinetinOps._ + +private[io] object SocketAddressHelpers { + + def getLocalAddress[F[_]](fd: Int)(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = + SocketAddressHelpers.toSocketAddress { (addr, len) => + F.delay(guard_(getsockname(fd, addr, len))) + } + + def allocateSockaddr[F[_]](implicit F: Sync[F]): Resource[F, (Ptr[sockaddr], Ptr[socklen_t])] = + Resource + .make(F.delay(Zone.open()))(z => F.delay(z.close())) + .evalMap { implicit z => + F.delay { + val addr = // allocate enough for an IPv6 + alloc[sockaddr_in6]().asInstanceOf[Ptr[sockaddr]] + val len = alloc[socklen_t]() + (addr, len) + } + } + + def toSockaddr[A]( + address: SocketAddress[IpAddress] + )(f: (Ptr[sockaddr], socklen_t) => A): A = + address.host.fold( + _ => + toSockaddrIn(address.asInstanceOf[SocketAddress[Ipv4Address]])( + f.asInstanceOf[(Ptr[sockaddr_in], socklen_t) => A] + ), + _ => + toSockaddrIn6(address.asInstanceOf[SocketAddress[Ipv6Address]])( + f.asInstanceOf[(Ptr[sockaddr_in6], socklen_t) => A] + ) + ) + + private[this] def toSockaddrIn[A]( + address: SocketAddress[Ipv4Address] + )(f: (Ptr[sockaddr_in], socklen_t) => A): A = { + val addr = stackalloc[sockaddr_in]() + val len = stackalloc[socklen_t]() + + toSockaddrIn(address, addr, len) + + f(addr, !len) + } + + private[this] def toSockaddrIn6[A]( + address: SocketAddress[Ipv6Address] + )(f: (Ptr[sockaddr_in6], socklen_t) => A): A = { + val addr = stackalloc[sockaddr_in6]() + val len = stackalloc[socklen_t]() + + toSockaddrIn6(address, addr, len) + + f(addr, !len) + } + + def toSockaddr( + address: SocketAddress[IpAddress], + addr: Ptr[sockaddr], + len: Ptr[socklen_t] + ): Unit = + address.host.fold( + _ => + toSockaddrIn( + address.asInstanceOf[SocketAddress[Ipv4Address]], + addr.asInstanceOf[Ptr[sockaddr_in]], + len + ), + _ => + toSockaddrIn6( + address.asInstanceOf[SocketAddress[Ipv6Address]], + addr.asInstanceOf[Ptr[sockaddr_in6]], + len + ) + ) + + private[this] def toSockaddrIn( + address: SocketAddress[Ipv4Address], + addr: Ptr[sockaddr_in], + len: Ptr[socklen_t] + ): Unit = { + !len = sizeof[sockaddr_in].toUInt + addr.sin_family = AF_INET.toUShort + addr.sin_port = htons(address.port.value.toUShort) + addr.sin_addr.s_addr = htonl(address.host.toLong.toUInt) + } + + private[this] def toSockaddrIn6[A]( + address: SocketAddress[Ipv6Address], + addr: Ptr[sockaddr_in6], + len: Ptr[socklen_t] + ): Unit = { + !len = sizeof[sockaddr_in6].toUInt + + addr.sin6_family = AF_INET6.toUShort + addr.sin6_port = htons(address.port.value.toUShort) + + val bytes = address.host.toBytes + var i = 0 + while (i < 0) { + addr.sin6_addr.s6_addr(i) = bytes(i).toUByte + i += 1 + } + } + + def toSocketAddress[F[_]]( + f: (Ptr[sockaddr], Ptr[socklen_t]) => F[Unit] + )(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = { + val addr = // allocate enough for an IPv6 + stackalloc[sockaddr_in6]().asInstanceOf[Ptr[sockaddr]] + val len = stackalloc[socklen_t]() + !len = sizeof[sockaddr_in6].toUInt + + f(addr, len) *> toSocketAddress(addr) + } + + def toSocketAddress[F[_]](addr: Ptr[sockaddr])(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = + if (addr.sa_family.toInt == AF_INET) + F.pure(toIpv4SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in]])) + else if (addr.sa_family.toInt == AF_INET6) + F.pure(toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]])) + else + F.raiseError(new IOException(s"Unsupported sa_family: ${addr.sa_family}")) + + private[this] def toIpv4SocketAddress(addr: Ptr[sockaddr_in]): SocketAddress[Ipv4Address] = { + val port = Port.fromInt(ntohs(addr.sin_port).toInt).get + val addrBytes = addr.sin_addr.at1.asInstanceOf[Ptr[Byte]] + val host = Ipv4Address.fromBytes( + addrBytes(0).toInt, + addrBytes(1).toInt, + addrBytes(2).toInt, + addrBytes(3).toInt + ) + SocketAddress(host, port) + } + + private[this] def toIpv6SocketAddress(addr: Ptr[sockaddr_in6]): SocketAddress[Ipv6Address] = { + val port = Port.fromInt(ntohs(addr.sin6_port).toInt).get + val addrBytes = addr.sin6_addr.at1.asInstanceOf[Ptr[Byte]] + val host = Ipv6Address.fromBytes { + val addr = new Array[Byte](16) + var i = 0 + while (i < addr.length) { + addr(i) = addrBytes(i.toLong) + i += 1 + } + addr + }.get + SocketAddress(host, port) + } +} diff --git a/io/native/src/main/scala/fs2/io/internal/netinet.scala b/io/native/src/main/scala/fs2/io/internal/netinet.scala new file mode 100644 index 0000000000..fdb1dc4afc --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/netinet.scala @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import scalanative.unsafe._ +import scalanative.posix.inttypes._ +import scalanative.posix.sys.socket._ + +private[io] object netinetin { + import Nat._ + type _16 = Digit2[_1, _6] + + type in_port_t = uint16_t + + type in_addr = CStruct1[uint32_t] + + type sockaddr_in = CStruct4[ + sa_family_t, + in_port_t, + in_addr, + CArray[Byte, _8] + ] + + type in6_addr = CStruct1[CArray[CUnsignedChar, _16]] + + type sockaddr_in6 = CStruct5[ + sa_family_t, + in_port_t, + uint32_t, + in6_addr, + uint32_t + ] + +} + +private[io] object netinetinOps { + import netinetin._ + + implicit final class in_addrOps(val in_addr: in_addr) extends AnyVal { + def s_addr: uint32_t = in_addr._1 + def s_addr_=(s_addr: uint32_t): Unit = in_addr._1 = s_addr + } + + implicit final class sockaddr_inOps(val sockaddr_in: Ptr[sockaddr_in]) extends AnyVal { + def sin_family: sa_family_t = sockaddr_in._1 + def sin_family_=(sin_family: sa_family_t): Unit = sockaddr_in._1 = sin_family + def sin_port: in_port_t = sockaddr_in._2 + def sin_port_=(sin_port: in_port_t): Unit = sockaddr_in._2 = sin_port + def sin_addr: in_addr = sockaddr_in._3 + def sin_addr_=(sin_addr: in_addr) = sockaddr_in._3 = sin_addr + } + + implicit final class in6_addrOps(val in6_addr: in6_addr) extends AnyVal { + def s6_addr: CArray[uint8_t, _16] = in6_addr._1 + def s6_addr_=(s6_addr: CArray[uint8_t, _16]): Unit = in6_addr._1 = s6_addr + } + + implicit final class sockaddr_in6Ops(val sockaddr_in6: Ptr[sockaddr_in6]) extends AnyVal { + def sin6_family: sa_family_t = sockaddr_in6._1 + def sin6_family_=(sin6_family: sa_family_t): Unit = sockaddr_in6._1 = sin6_family + def sin6_port: in_port_t = sockaddr_in6._2 + def sin6_port_=(sin6_port: in_port_t): Unit = sockaddr_in6._2 = sin6_port + def sin6_flowinfo: uint32_t = sockaddr_in6._3 + def sin6_flowinfo_=(sin6_flowinfo: uint32_t): Unit = sockaddr_in6._3 = sin6_flowinfo + def sin6_addr: in6_addr = sockaddr_in6._4 + def sin6_addr_=(sin6_addr: in6_addr) = sockaddr_in6._4 = sin6_addr + def sin6_scope_id: uint32_t = sockaddr_in6._5 + def sin6_scope_id_=(sin6_scope_id: uint32_t): Unit = sockaddr_in6._5 = sin6_scope_id + } + +} diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 6b25d18a77..f8a57719f9 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -29,6 +29,7 @@ import com.comcast.ip4s.IpAddress import com.comcast.ip4s.SocketAddress import fs2.io.internal.NativeUtil._ import fs2.io.internal.ResizableBuffer +import fs2.io.internal.SocketAddressHelpers._ import scala.scalanative.posix.unistd import java.util.concurrent.atomic.AtomicReference @@ -37,6 +38,7 @@ import FdPollingSocket._ private final class FdPollingSocket[F[_]]( fd: Int, + _remoteAddress: SocketAddress[IpAddress], readBuffer: ResizableBuffer[F], readMutex: Mutex[F], writeMutex: Mutex[F] @@ -53,8 +55,8 @@ private final class FdPollingSocket[F[_]]( guard_(unistd.close(fd)) } - def localAddress: F[SocketAddress[IpAddress]] = ??? - def remoteAddress: F[SocketAddress[IpAddress]] = ??? + def localAddress: F[SocketAddress[IpAddress]] = getLocalAddress(fd) + def remoteAddress: F[SocketAddress[IpAddress]] = F.pure(_remoteAddress) def endOfInput: F[Unit] = ??? def endOfOutput: F[Unit] = ??? From f93c66265dc1d45eb54596518c31aacdba453cf1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 17 Dec 2022 22:48:03 +0000 Subject: [PATCH 07/36] Implement `endOfInput`, `endOfOutput` --- io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index f8a57719f9..43d4bba24b 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -31,6 +31,7 @@ import fs2.io.internal.NativeUtil._ import fs2.io.internal.ResizableBuffer import fs2.io.internal.SocketAddressHelpers._ +import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.unistd import java.util.concurrent.atomic.AtomicReference @@ -58,8 +59,8 @@ private final class FdPollingSocket[F[_]]( def localAddress: F[SocketAddress[IpAddress]] = getLocalAddress(fd) def remoteAddress: F[SocketAddress[IpAddress]] = F.pure(_remoteAddress) - def endOfInput: F[Unit] = ??? - def endOfOutput: F[Unit] = ??? + def endOfInput: F[Unit] = F.delay(guard_(shutdown(fd, 0))) + def endOfOutput: F[Unit] = F.delay(guard_(shutdown(fd, 1))) private[this] val readCallback = new AtomicReference[Either[Throwable, Unit] => Unit] private[this] val writeCallback = new AtomicReference[Either[Throwable, Unit] => Unit] From 06c0fe76d77e728a49dc6bff419e4dc47604ad17 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 17 Dec 2022 23:14:53 +0000 Subject: [PATCH 08/36] Wip socket reading --- .../scala/fs2/io/net/FdPollingSocket.scala | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 43d4bba24b..a9035cd4bb 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -25,6 +25,7 @@ package io.net import cats.effect.kernel.Async import cats.effect.std.Mutex import cats.effect.unsafe.FileDescriptorPoller +import cats.syntax.all._ import com.comcast.ip4s.IpAddress import com.comcast.ip4s.SocketAddress import fs2.io.internal.NativeUtil._ @@ -33,6 +34,7 @@ import fs2.io.internal.SocketAddressHelpers._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ import java.util.concurrent.atomic.AtomicReference import FdPollingSocket._ @@ -65,7 +67,27 @@ private final class FdPollingSocket[F[_]]( private[this] val readCallback = new AtomicReference[Either[Throwable, Unit] => Unit] private[this] val writeCallback = new AtomicReference[Either[Throwable, Unit] => Unit] - def notifyFileDescriptorEvents(readReady: Boolean, writeReady: Boolean): Unit = ??? + def notifyFileDescriptorEvents(readReady: Boolean, writeReady: Boolean): Unit = { + if (readReady) { + val cb = readCallback.getAndSet(ReadySentinel) + if (cb ne null) cb(Either.unit) + } + if (writeReady) { + val cb = writeCallback.getAndSet(ReadySentinel) + if (cb ne null) cb(Either.unit) + } + } + + def awaitReadReady: F[Unit] = F.async { cb => + F.delay { + if (readCallback.compareAndSet(null, cb)) + Some(F.delay(readCallback.compareAndSet(cb, null))) + else { + cb(Either.unit) + None + } + } + } def read(maxBytes: Int): F[Option[Chunk[Byte]]] = ??? def readN(numBytes: Int): F[Chunk[Byte]] = ??? From a75a5abbdb6653a16f707509454b98de2caefecb Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 18 Dec 2022 00:50:01 +0000 Subject: [PATCH 09/36] Take address as ctor args --- io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index a9035cd4bb..1084464a26 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -41,10 +41,11 @@ import FdPollingSocket._ private final class FdPollingSocket[F[_]]( fd: Int, - _remoteAddress: SocketAddress[IpAddress], readBuffer: ResizableBuffer[F], readMutex: Mutex[F], - writeMutex: Mutex[F] + writeMutex: Mutex[F], + val localAddress: F[SocketAddress[IpAddress]], + val remoteAddress: F[SocketAddress[IpAddress]] )(implicit F: Async[F]) extends Socket[F] with FileDescriptorPoller.Callback { @@ -58,9 +59,6 @@ private final class FdPollingSocket[F[_]]( guard_(unistd.close(fd)) } - def localAddress: F[SocketAddress[IpAddress]] = getLocalAddress(fd) - def remoteAddress: F[SocketAddress[IpAddress]] = F.pure(_remoteAddress) - def endOfInput: F[Unit] = F.delay(guard_(shutdown(fd, 0))) def endOfOutput: F[Unit] = F.delay(guard_(shutdown(fd, 1))) From c104cd71c4148c5b4aa5be43110d6bdc4d31652c Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 18 Dec 2022 01:24:52 +0000 Subject: [PATCH 10/36] Implement reading --- .../scala/fs2/io/internal/NativeUtil.scala | 11 +++-- .../scala/fs2/io/net/FdPollingSocket.scala | 41 +++++++++++++++++-- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala index 8b3062d5ec..9f0fe5cbe5 100644 --- a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala +++ b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala @@ -23,6 +23,7 @@ package fs2.io.internal import scala.scalanative.annotation.alwaysinline import scala.scalanative.libc.errno._ +import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ import scala.scalanative.unsafe._ import java.io.IOException @@ -36,9 +37,13 @@ private[io] object NativeUtil { @alwaysinline def guard(thunk: => CInt): CInt = { val rtn = thunk - if (rtn < 0) - throw new IOException(fromCString(strerror(errno))) - else + if (rtn < 0) { + val en = errno + if (en == EAGAIN || en == EWOULDBLOCK) + rtn + else + throw new IOException(fromCString(strerror(errno))) + } else rtn } diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 1084464a26..c8919fb760 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -35,6 +35,7 @@ import fs2.io.internal.SocketAddressHelpers._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.unistd import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ import java.util.concurrent.atomic.AtomicReference import FdPollingSocket._ @@ -87,9 +88,41 @@ private final class FdPollingSocket[F[_]]( } } - def read(maxBytes: Int): F[Option[Chunk[Byte]]] = ??? - def readN(numBytes: Int): F[Chunk[Byte]] = ??? - def reads: Stream[F, Byte] = ??? + def read(maxBytes: Int): F[Option[Chunk[Byte]]] = readMutex.lock.surround { + readBuffer.get(maxBytes).flatMap { buf => + def go: F[Option[Chunk[Byte]]] = + F.delay(guard(unistd.read(fd, buf, maxBytes.toULong))).flatMap { rtn => + if (rtn > 0) + F.delay(Some(Chunk.fromBytePtr(buf, rtn))) + else if (rtn == 0) + F.pure(None) + else + awaitReadReady *> go + } + + go + } + } + + def readN(numBytes: Int): F[Chunk[Byte]] = readMutex.lock.surround { + readBuffer.get(numBytes).flatMap { buf => + def go(pos: Int): F[Chunk[Byte]] = + F.delay(guard(unistd.read(fd, buf + pos.toLong, (numBytes - pos).toULong))).flatMap { rtn => + if (rtn > 0) { + val newPos = pos + rtn + if (newPos < numBytes) go(newPos) + else F.delay(Chunk.fromBytePtr(buf, newPos)) + } else if (rtn == 0) + F.delay(Chunk.fromBytePtr(buf, pos)) + else + awaitReadReady *> go(pos) + } + + go(0) + } + } + + def reads: Stream[F, Byte] = Stream.repeatEval(read(DefaultReadSize)).unNoneTerminate.unchunks def write(bytes: Chunk[Byte]): F[Unit] = ??? def writes: Pipe[F, Byte, Nothing] = ??? @@ -98,6 +131,8 @@ private final class FdPollingSocket[F[_]]( private object FdPollingSocket { + private final val DefaultReadSize = 8192 + private val ReadySentinel: Either[Throwable, Unit] => Unit = _ => () } From c8e23167126727f97cd7150b4e93d0d8dba524c0 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 18 Dec 2022 01:27:07 +0000 Subject: [PATCH 11/36] Address warnings --- .../src/main/scala/fs2/io/net/FdPollingSocket.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index c8919fb760..8ca28b94a0 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -30,11 +30,9 @@ import com.comcast.ip4s.IpAddress import com.comcast.ip4s.SocketAddress import fs2.io.internal.NativeUtil._ import fs2.io.internal.ResizableBuffer -import fs2.io.internal.SocketAddressHelpers._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.unistd -import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ import java.util.concurrent.atomic.AtomicReference @@ -80,7 +78,12 @@ private final class FdPollingSocket[F[_]]( def awaitReadReady: F[Unit] = F.async { cb => F.delay { if (readCallback.compareAndSet(null, cb)) - Some(F.delay(readCallback.compareAndSet(cb, null))) + Some( + F.delay { + readCallback.compareAndSet(cb, null) + () + } + ) else { cb(Either.unit) None From f0fd81d836f941b4e1768d7a7696530fcbdf5f41 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 18 Dec 2022 01:43:45 +0000 Subject: [PATCH 12/36] Bikeshed --- .../scala/fs2/io/net/FdPollingSocket.scala | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 8ca28b94a0..1b31245f8e 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -94,10 +94,10 @@ private final class FdPollingSocket[F[_]]( def read(maxBytes: Int): F[Option[Chunk[Byte]]] = readMutex.lock.surround { readBuffer.get(maxBytes).flatMap { buf => def go: F[Option[Chunk[Byte]]] = - F.delay(guard(unistd.read(fd, buf, maxBytes.toULong))).flatMap { rtn => - if (rtn > 0) - F.delay(Some(Chunk.fromBytePtr(buf, rtn))) - else if (rtn == 0) + F.delay(guard(unistd.read(fd, buf, maxBytes.toULong))).flatMap { readed => + if (readed > 0) + F.delay(Some(Chunk.fromBytePtr(buf, readed))) + else if (readed == 0) F.pure(None) else awaitReadReady *> go @@ -110,16 +110,17 @@ private final class FdPollingSocket[F[_]]( def readN(numBytes: Int): F[Chunk[Byte]] = readMutex.lock.surround { readBuffer.get(numBytes).flatMap { buf => def go(pos: Int): F[Chunk[Byte]] = - F.delay(guard(unistd.read(fd, buf + pos.toLong, (numBytes - pos).toULong))).flatMap { rtn => - if (rtn > 0) { - val newPos = pos + rtn - if (newPos < numBytes) go(newPos) - else F.delay(Chunk.fromBytePtr(buf, newPos)) - } else if (rtn == 0) - F.delay(Chunk.fromBytePtr(buf, pos)) - else - awaitReadReady *> go(pos) - } + F.delay(guard(unistd.read(fd, buf + pos.toLong, (numBytes - pos).toULong))) + .flatMap { readed => + if (readed > 0) { + val newPos = pos + readed + if (newPos < numBytes) go(newPos) + else F.delay(Chunk.fromBytePtr(buf, newPos)) + } else if (readed == 0) + F.delay(Chunk.fromBytePtr(buf, pos)) + else + awaitReadReady *> go(pos) + } go(0) } From 86ceb060960cb9625256fa37f424837ed4c00835 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 18 Dec 2022 01:51:00 +0000 Subject: [PATCH 13/36] Implement writing --- .../scala/fs2/io/net/FdPollingSocket.scala | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 1b31245f8e..15cd93aa09 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -31,8 +31,10 @@ import com.comcast.ip4s.SocketAddress import fs2.io.internal.NativeUtil._ import fs2.io.internal.ResizableBuffer +import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ import java.util.concurrent.atomic.AtomicReference @@ -128,8 +130,46 @@ private final class FdPollingSocket[F[_]]( def reads: Stream[F, Byte] = Stream.repeatEval(read(DefaultReadSize)).unNoneTerminate.unchunks - def write(bytes: Chunk[Byte]): F[Unit] = ??? - def writes: Pipe[F, Byte, Nothing] = ??? + def awaitWriteReady: F[Unit] = F.async { cb => + F.delay { + if (writeCallback.compareAndSet(null, cb)) + Some( + F.delay { + writeCallback.compareAndSet(cb, null) + () + } + ) + else { + cb(Either.unit) + None + } + } + } + + def write(bytes: Chunk[Byte]): F[Unit] = writeMutex.lock.surround { + val Chunk.ArraySlice(buf, offset, length) = bytes.toArraySlice + + def go(pos: Int): F[Unit] = + F.delay { + if (LinktimeInfo.isLinux) + send(fd, buf.at(offset + pos), (length - pos).toULong, MSG_NOSIGNAL).toInt + else + unistd.write(fd, buf.at(offset + pos), (length - pos).toULong) + }.flatMap { wrote => + if (wrote > 0) { + val newPos = pos + wrote + if (newPos < length) + go(newPos) + else + F.unit + } else + awaitWriteReady *> go(pos) + } + + go(0) + } + + def writes: Pipe[F, Byte, Nothing] = _.chunks.foreach(write(_)) } From 3eb0e18fd8e56965b4ca6dcd9de5a28d3a410dbd Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 21 Dec 2022 05:03:01 +0000 Subject: [PATCH 14/36] Bump CE snapshot, adopt new fd polling api --- build.sbt | 6 +- .../fs2/io/internal/ResizableBuffer.scala | 42 +++--- .../scala/fs2/io/net/FdPollingSocket.scala | 135 +++++------------- .../scala/fs2/io/net/tls/S2nConnection.scala | 2 +- 4 files changed, 67 insertions(+), 118 deletions(-) diff --git a/build.sbt b/build.sbt index b8d7c616ac..c532395f18 100644 --- a/build.sbt +++ b/build.sbt @@ -209,9 +209,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % "2.9.0", "org.typelevel" %%% "cats-laws" % "2.9.0" % Test, - "org.typelevel" %%% "cats-effect" % "3.5-4f9e57b", - "org.typelevel" %%% "cats-effect-laws" % "3.5-4f9e57b" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.5-4f9e57b" % Test, + "org.typelevel" %%% "cats-effect" % "3.5-9ba870f", + "org.typelevel" %%% "cats-effect-laws" % "3.5-9ba870f" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.5-9ba870f" % Test, "org.scodec" %%% "scodec-bits" % "1.1.34", "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test, diff --git a/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala b/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala index 1ec4310bd0..52b946c4b4 100644 --- a/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala +++ b/io/native/src/main/scala/fs2/io/internal/ResizableBuffer.scala @@ -21,8 +21,10 @@ package fs2.io.internal +import cats.effect.kernel.Async import cats.effect.kernel.Resource -import cats.effect.kernel.Sync +import cats.effect.std.Semaphore +import cats.syntax.all._ import scala.scalanative.libc.errno._ import scala.scalanative.libc.stdlib._ @@ -31,19 +33,22 @@ import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ private[io] final class ResizableBuffer[F[_]] private ( + semaphore: Semaphore[F], private var ptr: Ptr[Byte], private[this] var size: Int -)(implicit F: Sync[F]) { +)(implicit F: Async[F]) { - def get(size: Int): F[Ptr[Byte]] = F.delay { - if (size <= this.size) - ptr - else { - ptr = realloc(ptr, size.toUInt) - this.size = size - if (ptr == null) - throw new RuntimeException(fromCString(strerror(errno))) - else ptr + def get(size: Int): Resource[F, Ptr[Byte]] = semaphore.permit.evalMap { _ => + F.delay { + if (size <= this.size) + ptr + else { + ptr = realloc(ptr, size.toUInt) + this.size = size + if (ptr == null) + throw new RuntimeException(fromCString(strerror(errno))) + else ptr + } } } @@ -51,14 +56,17 @@ private[io] final class ResizableBuffer[F[_]] private ( private[io] object ResizableBuffer { - def apply[F[_]](size: Int)(implicit F: Sync[F]): Resource[F, ResizableBuffer[F]] = + def apply[F[_]](size: Int)(implicit F: Async[F]): Resource[F, ResizableBuffer[F]] = Resource.make { - F.delay { - val ptr = malloc(size.toUInt) - if (ptr == null) - throw new RuntimeException(fromCString(strerror(errno))) - else new ResizableBuffer(ptr, size) + Semaphore[F](1).flatMap { semaphore => + F.delay { + val ptr = malloc(size.toUInt) + if (ptr == null) + throw new RuntimeException(fromCString(strerror(errno))) + else new ResizableBuffer(semaphore, ptr, size) + } } + }(buf => F.delay(free(buf.ptr))) } diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 15cd93aa09..49bad882f0 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -22,9 +22,10 @@ package fs2 package io.net +import cats.effect.FileDescriptorPollHandle +import cats.effect.IO +import cats.effect.LiftIO import cats.effect.kernel.Async -import cats.effect.std.Mutex -import cats.effect.unsafe.FileDescriptorPoller import cats.syntax.all._ import com.comcast.ip4s.IpAddress import com.comcast.ip4s.SocketAddress @@ -36,20 +37,15 @@ import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.unistd import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ -import java.util.concurrent.atomic.AtomicReference -import FdPollingSocket._ - -private final class FdPollingSocket[F[_]]( +private final class FdPollingSocket[F[_]: LiftIO]( fd: Int, + handle: FileDescriptorPollHandle, readBuffer: ResizableBuffer[F], - readMutex: Mutex[F], - writeMutex: Mutex[F], val localAddress: F[SocketAddress[IpAddress]], val remoteAddress: F[SocketAddress[IpAddress]] )(implicit F: Async[F]) - extends Socket[F] - with FileDescriptorPoller.Callback { + extends Socket[F] { @volatile private[this] var open = true @@ -63,120 +59,65 @@ private final class FdPollingSocket[F[_]]( def endOfInput: F[Unit] = F.delay(guard_(shutdown(fd, 0))) def endOfOutput: F[Unit] = F.delay(guard_(shutdown(fd, 1))) - private[this] val readCallback = new AtomicReference[Either[Throwable, Unit] => Unit] - private[this] val writeCallback = new AtomicReference[Either[Throwable, Unit] => Unit] - - def notifyFileDescriptorEvents(readReady: Boolean, writeReady: Boolean): Unit = { - if (readReady) { - val cb = readCallback.getAndSet(ReadySentinel) - if (cb ne null) cb(Either.unit) - } - if (writeReady) { - val cb = writeCallback.getAndSet(ReadySentinel) - if (cb ne null) cb(Either.unit) - } - } - - def awaitReadReady: F[Unit] = F.async { cb => - F.delay { - if (readCallback.compareAndSet(null, cb)) - Some( - F.delay { - readCallback.compareAndSet(cb, null) - () - } - ) - else { - cb(Either.unit) - None + def read(maxBytes: Int): F[Option[Chunk[Byte]]] = readBuffer.get(maxBytes).use { buf => + handle + .pollReadRec(()) { _ => + IO(guard(unistd.read(fd, buf, maxBytes.toULong))).flatMap { readed => + if (readed > 0) + IO(Right(Some(Chunk.fromBytePtr(buf, readed)))) + else if (readed == 0) + IO.pure(Right(None)) + else + IO.pure(Left(())) + } } - } + .to } - def read(maxBytes: Int): F[Option[Chunk[Byte]]] = readMutex.lock.surround { - readBuffer.get(maxBytes).flatMap { buf => - def go: F[Option[Chunk[Byte]]] = - F.delay(guard(unistd.read(fd, buf, maxBytes.toULong))).flatMap { readed => - if (readed > 0) - F.delay(Some(Chunk.fromBytePtr(buf, readed))) - else if (readed == 0) - F.pure(None) + def readN(numBytes: Int): F[Chunk[Byte]] = + readBuffer.get(numBytes).use { buf => + def go(pos: Int): IO[Either[Int, Chunk[Byte]]] = + IO(guard(unistd.read(fd, buf + pos.toLong, (numBytes - pos).toULong))).flatMap { readed => + if (readed > 0) { + val newPos = pos + readed + if (newPos < numBytes) go(newPos) + else IO(Right(Chunk.fromBytePtr(buf, newPos))) + } else if (readed == 0) + IO(Right(Chunk.fromBytePtr(buf, pos))) else - awaitReadReady *> go + IO.pure(Left(pos)) } - go + handle.pollReadRec(0)(go(_)).to } - } - def readN(numBytes: Int): F[Chunk[Byte]] = readMutex.lock.surround { - readBuffer.get(numBytes).flatMap { buf => - def go(pos: Int): F[Chunk[Byte]] = - F.delay(guard(unistd.read(fd, buf + pos.toLong, (numBytes - pos).toULong))) - .flatMap { readed => - if (readed > 0) { - val newPos = pos + readed - if (newPos < numBytes) go(newPos) - else F.delay(Chunk.fromBytePtr(buf, newPos)) - } else if (readed == 0) - F.delay(Chunk.fromBytePtr(buf, pos)) - else - awaitReadReady *> go(pos) - } - - go(0) - } - } + private[this] final val DefaultReadSize = 8192 def reads: Stream[F, Byte] = Stream.repeatEval(read(DefaultReadSize)).unNoneTerminate.unchunks - def awaitWriteReady: F[Unit] = F.async { cb => - F.delay { - if (writeCallback.compareAndSet(null, cb)) - Some( - F.delay { - writeCallback.compareAndSet(cb, null) - () - } - ) - else { - cb(Either.unit) - None - } - } - } - - def write(bytes: Chunk[Byte]): F[Unit] = writeMutex.lock.surround { + def write(bytes: Chunk[Byte]): F[Unit] = { val Chunk.ArraySlice(buf, offset, length) = bytes.toArraySlice - def go(pos: Int): F[Unit] = - F.delay { + def go(pos: Int): IO[Either[Int, Unit]] = + IO { if (LinktimeInfo.isLinux) send(fd, buf.at(offset + pos), (length - pos).toULong, MSG_NOSIGNAL).toInt else unistd.write(fd, buf.at(offset + pos), (length - pos).toULong) }.flatMap { wrote => - if (wrote > 0) { + if (wrote >= 0) { val newPos = pos + wrote if (newPos < length) go(newPos) else - F.unit + IO.pure(Either.unit) } else - awaitWriteReady *> go(pos) + IO.pure(Left(pos)) } - go(0) + handle.pollWriteRec(0)(go(_)).to } def writes: Pipe[F, Byte, Nothing] = _.chunks.foreach(write(_)) } - -private object FdPollingSocket { - - private final val DefaultReadSize = 8192 - - private val ReadySentinel: Either[Throwable, Unit] => Unit = _ => () - -} diff --git a/io/native/src/main/scala/fs2/io/net/tls/S2nConnection.scala b/io/native/src/main/scala/fs2/io/net/tls/S2nConnection.scala index 67dbd605dd..0eac89ff81 100644 --- a/io/native/src/main/scala/fs2/io/net/tls/S2nConnection.scala +++ b/io/native/src/main/scala/fs2/io/net/tls/S2nConnection.scala @@ -136,7 +136,7 @@ private[tls] object S2nConnection { }.iterateUntil(_.toInt == S2N_NOT_BLOCKED) *> F.delay(guard_(s2n_connection_free_handshake(conn))) - def read(n: Int) = readBuffer.get(n).flatMap { buf => + def read(n: Int) = readBuffer.get(n).use { buf => def go(i: Int): F[Option[Chunk[Byte]]] = F.delay { readTasks.set(F.unit) From 8889e05f05b11676fa7b83c543496909a6492c75 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 00:19:33 +0000 Subject: [PATCH 15/36] First attempt at unix sockets --- .../scala/fs2/io/internal/NativeUtil.scala | 7 + ...dressHelpers.scala => SocketHelpers.scala} | 43 ++++- .../scala/fs2/io/internal/syssocket.scala | 44 +++++ .../main/scala/fs2/io/internal/sysun.scala | 48 ++++++ .../src/main/scala/fs2/io/ioplatform.scala | 18 +- .../scala/fs2/io/net/FdPollingSocket.scala | 31 ++-- .../net/unixsocket/FdPollingUnixSockets.scala | 158 ++++++++++++++++++ 7 files changed, 334 insertions(+), 15 deletions(-) rename io/native/src/main/scala/fs2/io/internal/{SocketAddressHelpers.scala => SocketHelpers.scala} (83%) create mode 100644 io/native/src/main/scala/fs2/io/internal/syssocket.scala create mode 100644 io/native/src/main/scala/fs2/io/internal/sysun.scala create mode 100644 io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala diff --git a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala index 9f0fe5cbe5..80a0a89446 100644 --- a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala +++ b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala @@ -21,8 +21,11 @@ package fs2.io.internal +import cats.effect.Sync + import scala.scalanative.annotation.alwaysinline import scala.scalanative.libc.errno._ +import scala.scalanative.posix.fcntl._ import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ import scala.scalanative.unsafe._ @@ -47,4 +50,8 @@ private[io] object NativeUtil { rtn } + def setNonBlocking[F[_]](fd: CInt)(implicit F: Sync[F]): F[Unit] = F.delay { + guard_(fcntl(fd, F_SETFL, O_NONBLOCK)) + } + } diff --git a/io/native/src/main/scala/fs2/io/internal/SocketAddressHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala similarity index 83% rename from io/native/src/main/scala/fs2/io/internal/SocketAddressHelpers.scala rename to io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index a0a3ad2e76..b0521ea82b 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketAddressHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -31,20 +31,59 @@ import com.comcast.ip4s.Port import com.comcast.ip4s.SocketAddress import java.io.IOException +import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.arpa.inet._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.sys.socketOps._ +import scala.scalanative.posix.unistd._ import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ import NativeUtil._ import netinetin._ import netinetinOps._ +import syssocket._ -private[io] object SocketAddressHelpers { +private[io] object SocketHelpers { + + def openNonBlocking[F[_]](domain: CInt, `type`: CInt)(implicit F: Sync[F]): Resource[F, CInt] = + Resource + .make { + F.delay { + val SOCK_NONBLOCK = + if (LinktimeInfo.isLinux) + syssocket.SOCK_NONBLOCK + else 0 + + guard(socket(domain, `type` | SOCK_NONBLOCK, 0)) + } + }(fd => F.delay(guard_(close(fd)))) + .evalTap { fd => + (if (!LinktimeInfo.isLinux) setNonBlocking(fd) else F.unit) *> + (if (LinktimeInfo.isMac) setNoSigPipe(fd) else F.unit) + } + + // macOS-only + def setNoSigPipe[F[_]: Sync](fd: CInt): F[Unit] = + setOption(fd, SO_NOSIGPIPE, true) + + def setOption[F[_]](fd: CInt, option: CInt, value: Boolean)(implicit F: Sync[F]): F[Unit] = + F.delay { + val ptr = stackalloc[CInt]() + !ptr = if (value.asInstanceOf[java.lang.Boolean]) 1 else 0 + guard_( + setsockopt( + fd, + SOL_SOCKET, + option, + ptr.asInstanceOf[Ptr[Byte]], + sizeof[CInt].toUInt + ) + ) + } def getLocalAddress[F[_]](fd: Int)(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = - SocketAddressHelpers.toSocketAddress { (addr, len) => + SocketHelpers.toSocketAddress { (addr, len) => F.delay(guard_(getsockname(fd, addr, len))) } diff --git a/io/native/src/main/scala/fs2/io/internal/syssocket.scala b/io/native/src/main/scala/fs2/io/internal/syssocket.scala new file mode 100644 index 0000000000..f8af29ee8e --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/syssocket.scala @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.unsafe._ + +@extern +private[io] object syssocket { + // only in Linux and FreeBSD, but not macOS + final val SOCK_NONBLOCK = 2048 + + // only on macOS and some BSDs (?) + final val SO_NOSIGPIPE = 0x1022 /* APPLE: No SIGPIPE on EPIPE */ + + def bind(sockfd: CInt, addr: Ptr[sockaddr], addrlen: socklen_t): CInt = + extern + + def accept(sockfd: CInt, addr: Ptr[sockaddr], addrlen: Ptr[socklen_t]): CInt = + extern + + // only supported on Linux and FreeBSD, but not macOS + def accept4(sockfd: CInt, addr: Ptr[sockaddr], addrlen: Ptr[socklen_t], flags: CInt): CInt = + extern +} diff --git a/io/native/src/main/scala/fs2/io/internal/sysun.scala b/io/native/src/main/scala/fs2/io/internal/sysun.scala new file mode 100644 index 0000000000..951a1ca346 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/internal/sysun.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.io.internal + +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.unsafe._ + +private[io] object sysun { + import Nat._ + type _108 = Digit3[_1, _0, _8] + + type sockaddr_un = CStruct2[ + sa_family_t, + CArray[CChar, _108] + ] + +} + +private[io] object sysunOps { + import sysun._ + + implicit final class sockaddr_unOps(val sockaddr_un: Ptr[sockaddr_un]) extends AnyVal { + def sun_family: sa_family_t = sockaddr_un._1 + def sun_family_=(sun_family: sa_family_t): Unit = sockaddr_un._1 = sun_family + def sun_path: CArray[CChar, _108] = sockaddr_un._2 + def sun_path_=(sun_path: CArray[CChar, _108]): Unit = sockaddr_un._2 = sun_path + } + +} diff --git a/io/native/src/main/scala/fs2/io/ioplatform.scala b/io/native/src/main/scala/fs2/io/ioplatform.scala index 4cdd4d1f8f..43c8807c99 100644 --- a/io/native/src/main/scala/fs2/io/ioplatform.scala +++ b/io/native/src/main/scala/fs2/io/ioplatform.scala @@ -22,4 +22,20 @@ package fs2 package io -private[fs2] trait ioplatform extends iojvmnative +import cats.effect.FileDescriptorPoller +import cats.effect.IO +import cats.effect.LiftIO +import cats.syntax.all._ + +private[fs2] trait ioplatform extends iojvmnative { + + private[fs2] def fileDescriptorPoller[F[_]: LiftIO]: F[FileDescriptorPoller] = + IO.poller[FileDescriptorPoller] + .flatMap( + _.liftTo[IO]( + new RuntimeException("Installed PollingSystem does not provide a FileDescriptorPoller") + ) + ) + .to + +} diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 49bad882f0..1fb4e7a71e 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -26,6 +26,7 @@ import cats.effect.FileDescriptorPollHandle import cats.effect.IO import cats.effect.LiftIO import cats.effect.kernel.Async +import cats.effect.kernel.Resource import cats.syntax.all._ import com.comcast.ip4s.IpAddress import com.comcast.ip4s.SocketAddress @@ -38,24 +39,18 @@ import scala.scalanative.posix.unistd import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ -private final class FdPollingSocket[F[_]: LiftIO]( +import FdPollingSocket._ + +private final class FdPollingSocket[F[_]: LiftIO] private ( fd: Int, handle: FileDescriptorPollHandle, readBuffer: ResizableBuffer[F], + val isOpen: F[Boolean], val localAddress: F[SocketAddress[IpAddress]], val remoteAddress: F[SocketAddress[IpAddress]] )(implicit F: Async[F]) extends Socket[F] { - @volatile private[this] var open = true - - def isOpen: F[Boolean] = F.delay(open) - - def close: F[Unit] = F.delay { - open = false - guard_(unistd.close(fd)) - } - def endOfInput: F[Unit] = F.delay(guard_(shutdown(fd, 0))) def endOfOutput: F[Unit] = F.delay(guard_(shutdown(fd, 1))) @@ -91,8 +86,6 @@ private final class FdPollingSocket[F[_]: LiftIO]( handle.pollReadRec(0)(go(_)).to } - private[this] final val DefaultReadSize = 8192 - def reads: Stream[F, Byte] = Stream.repeatEval(read(DefaultReadSize)).unNoneTerminate.unchunks def write(bytes: Chunk[Byte]): F[Unit] = { @@ -121,3 +114,17 @@ private final class FdPollingSocket[F[_]: LiftIO]( def writes: Pipe[F, Byte, Nothing] = _.chunks.foreach(write(_)) } + +private object FdPollingSocket { + private final val DefaultReadSize = 8192 + + def apply[F[_]: LiftIO]( + fd: Int, + handle: FileDescriptorPollHandle, + localAddress: F[SocketAddress[IpAddress]], + remoteAddress: F[SocketAddress[IpAddress]] + )(implicit F: Async[F]): Resource[F, Socket[F]] = for { + buffer <- ResizableBuffer(DefaultReadSize) + isOpen <- Resource.make(F.ref(true))(_.set(false)) + } yield new FdPollingSocket(fd, handle, buffer, isOpen.get, localAddress, remoteAddress) +} diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala new file mode 100644 index 0000000000..a4fab6f097 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package net +package unixsocket + +import cats.effect.IO +import cats.effect.LiftIO +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.syntax.all._ +import fs2.io.file.Files +import fs2.io.file.Path +import fs2.io.internal.NativeUtil._ +import fs2.io.internal.SocketHelpers +import fs2.io.internal.syssocket._ +import fs2.io.internal.sysun._ +import fs2.io.internal.sysunOps._ + +import scala.scalanative.libc.errno._ +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.sys.socket.{bind => _, accept => _, _} +import scala.scalanative.posix.unistd._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[F]) + extends UnixSockets[F] { + + def client(address: UnixSocketAddress): Resource[F, Socket[F]] = for { + poller <- Resource.eval(fileDescriptorPoller[F]) + fd <- SocketHelpers.openNonBlocking(AF_UNIX, SOCK_STREAM) + handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) + _ <- Resource.eval { + toSockaddrUn(address.path).use { addr => + handle + .pollWriteRec(false) { connected => + if (connected) IO.pure(Either.unit) + else + IO { + if (connect(fd, addr, sizeof[sockaddr_un].toUInt) < 0) { + val e = errno + if (e == EINPROGRESS) + Left(true) // we will be connected when we unblock + else if (e == ECONNREFUSED) + throw new ConnectException(fromCString(strerror(errno))) + else + throw new IOException(fromCString(strerror(errno))) + } else + Either.unit + } + } + .to + } + } + socket <- FdPollingSocket[F](fd, handle, raiseIpAddressError, raiseIpAddressError) + } yield socket + + def server( + address: UnixSocketAddress, + deleteIfExists: Boolean, + deleteOnClose: Boolean + ): Stream[F, Socket[F]] = for { + poller <- Stream.eval(fileDescriptorPoller[F]) + + _ <- Stream.bracket(Files[F].deleteIfExists(Path(address.path)).whenA(deleteIfExists)) { _ => + Files[F].deleteIfExists(Path(address.path)).whenA(deleteOnClose) + } + + fd <- Stream.resource(SocketHelpers.openNonBlocking(AF_UNIX, SOCK_STREAM)) + handle <- Stream.resource(poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK)) + + _ <- Stream.eval { + toSockaddrUn(address.path).use { addr => + F.delay(guard_(bind(fd, addr, sizeof[sockaddr_un].toUInt))) + } *> F.delay(guard_(listen(fd, 0))) + } + + socket <- Stream + .resource { + val accepted = for { + fd <- Resource.makeFull[F, Int] { poll => + poll { + handle + .pollReadRec(()) { _ => + IO { + val clientFd = + if (LinktimeInfo.isLinux) + guard(accept(fd, null, null)) + else + guard(accept4(fd, null, null, SOCK_NONBLOCK)) + + if (clientFd >= 0) + Right(clientFd) + else + Left(()) + } + } + .to + } + }(fd => F.delay(guard_(close(fd)))) + _ <- + if (!LinktimeInfo.isLinux) + Resource.eval(setNonBlocking(fd)) + else Resource.unit[F] + handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) + socket <- FdPollingSocket[F](fd, handle, raiseIpAddressError, raiseIpAddressError) + } yield socket + + accepted.attempt + .map(_.toOption) + } + .repeat + .unNone + + } yield socket + + private def toSockaddrUn(path: String): Resource[F, Ptr[sockaddr]] = + Resource.make(F.delay(Zone.open()))(z => F.delay(z.close())).evalMap[Ptr[sockaddr]] { + implicit z => + val pathBytes = path.getBytes + if (pathBytes.length > 107) + F.raiseError(new IllegalArgumentException(s"Path too long: $path")) + else + F.delay { + val addr = alloc[sockaddr_un]() + addr.sun_family = AF_UNIX.toUShort + memcpy(addr.sun_path.at(0), pathBytes.at(0), pathBytes.length.toULong) + addr.asInstanceOf[Ptr[sockaddr]] + } + } + + private def raiseIpAddressError[A]: F[A] = + F.raiseError(new UnsupportedOperationException("UnixSockets do not use IP addressing")) + +} From 5c62bc2728a8301dd25af3d14d7b8de51dbb30b1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 00:37:49 +0000 Subject: [PATCH 16/36] Cross-compile unixsockets tests --- .../src/test/scala/fs2/io/Fs2IoSuite.scala | 25 ------------------- .../net/unixsocket/UnixSocketsPlatform.scala | 8 +++++- .../test/scala/fs2/io/net/tls/TLSSuite.scala | 2 +- .../UnixSocketsSuitePlatform.scala} | 8 +++--- io/shared/src/test/scala/fs2/io/IoSuite.scala | 2 +- .../test/scala/fs2/io/file/FilesSuite.scala | 2 +- .../test/scala/fs2/io/file/PathSuite.scala | 2 +- .../fs2/io/file/PosixPermissionsSuite.scala | 2 +- .../scala/fs2/io/net/tcp/SocketSuite.scala | 2 +- .../io/net/unixsocket/UnixSocketsSuite.scala | 0 10 files changed, 17 insertions(+), 36 deletions(-) delete mode 100644 io/js-jvm/src/test/scala/fs2/io/Fs2IoSuite.scala rename io/native/src/test/scala/fs2/io/{Fs2IoSuite.scala => net/unixsockets/UnixSocketsSuitePlatform.scala} (87%) rename io/{js-jvm => shared}/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala (100%) diff --git a/io/js-jvm/src/test/scala/fs2/io/Fs2IoSuite.scala b/io/js-jvm/src/test/scala/fs2/io/Fs2IoSuite.scala deleted file mode 100644 index 76c8042f20..0000000000 --- a/io/js-jvm/src/test/scala/fs2/io/Fs2IoSuite.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2013 Functional Streams for Scala - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -package fs2 -package io - -abstract class Fs2IoSuite extends Fs2Suite diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala index e5599efece..689f56e6da 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala @@ -21,4 +21,10 @@ package fs2.io.net.unixsocket -private[unixsocket] trait UnixSocketsCompanionPlatform +import cats.effect.LiftIO +import cats.effect.kernel.Async + +private[unixsocket] trait UnixSocketsCompanionPlatform { + implicit def forAsync[F[_]: Async: LiftIO]: UnixSockets[F] = + new FdPollingUnixSockets[F] +} diff --git a/io/native/src/test/scala/fs2/io/net/tls/TLSSuite.scala b/io/native/src/test/scala/fs2/io/net/tls/TLSSuite.scala index 4fb261bb27..fce760199e 100644 --- a/io/native/src/test/scala/fs2/io/net/tls/TLSSuite.scala +++ b/io/native/src/test/scala/fs2/io/net/tls/TLSSuite.scala @@ -30,7 +30,7 @@ import fs2.io.file.Files import fs2.io.file.Path import scodec.bits.ByteVector -abstract class TLSSuite extends Fs2IoSuite { +abstract class TLSSuite extends Fs2Suite { def testTlsContext: Resource[IO, TLSContext[IO]] = for { cert <- Resource.eval { Files[IO].readAll(Path("io/shared/src/test/resources/cert.pem")).compile.to(ByteVector) diff --git a/io/native/src/test/scala/fs2/io/Fs2IoSuite.scala b/io/native/src/test/scala/fs2/io/net/unixsockets/UnixSocketsSuitePlatform.scala similarity index 87% rename from io/native/src/test/scala/fs2/io/Fs2IoSuite.scala rename to io/native/src/test/scala/fs2/io/net/unixsockets/UnixSocketsSuitePlatform.scala index 40512de1ab..92ac7a5949 100644 --- a/io/native/src/test/scala/fs2/io/Fs2IoSuite.scala +++ b/io/native/src/test/scala/fs2/io/net/unixsockets/UnixSocketsSuitePlatform.scala @@ -20,10 +20,10 @@ */ package fs2 -package io +package io.net.unixsocket -import epollcat.unsafe.EpollRuntime +import cats.effect.IO -abstract class Fs2IoSuite extends Fs2Suite { - override def munitIORuntime = EpollRuntime.global +trait UnixSocketsSuitePlatform { self: UnixSocketsSuite => + testProvider("native")(UnixSockets.forAsync[IO]) } diff --git a/io/shared/src/test/scala/fs2/io/IoSuite.scala b/io/shared/src/test/scala/fs2/io/IoSuite.scala index 387d32adad..23ffd78022 100644 --- a/io/shared/src/test/scala/fs2/io/IoSuite.scala +++ b/io/shared/src/test/scala/fs2/io/IoSuite.scala @@ -28,7 +28,7 @@ import org.scalacheck.effect.PropF.forAllF import java.io.ByteArrayInputStream import java.io.InputStream -class IoSuite extends io.Fs2IoSuite { +class IoSuite extends Fs2Suite { group("readInputStream") { test("non-buffered") { forAllF { (bytes: Array[Byte], chunkSize0: Int) => diff --git a/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala b/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala index f9f9cf2929..a514344ac1 100644 --- a/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala @@ -29,7 +29,7 @@ import cats.syntax.all._ import scala.concurrent.duration._ -class FilesSuite extends Fs2IoSuite with BaseFileSuite { +class FilesSuite extends Fs2Suite with BaseFileSuite { group("readAll") { test("retrieves whole content of a file") { diff --git a/io/shared/src/test/scala/fs2/io/file/PathSuite.scala b/io/shared/src/test/scala/fs2/io/file/PathSuite.scala index e0229b32df..d25a80305f 100644 --- a/io/shared/src/test/scala/fs2/io/file/PathSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/PathSuite.scala @@ -31,7 +31,7 @@ import org.scalacheck.Cogen import org.scalacheck.Gen import org.scalacheck.Prop.forAll -class PathSuite extends Fs2IoSuite { +class PathSuite extends Fs2Suite { implicit val arbitraryPath: Arbitrary[Path] = Arbitrary(for { names <- Gen.listOf(Gen.alphaNumStr) diff --git a/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala b/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala index 17df211606..79f116bde1 100644 --- a/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/PosixPermissionsSuite.scala @@ -23,7 +23,7 @@ package fs2 package io package file -class PosixPermissionsSuite extends Fs2IoSuite { +class PosixPermissionsSuite extends Fs2Suite { test("construction") { val cases = Seq( "777" -> "rwxrwxrwx", diff --git a/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala b/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala index b7f0c761b2..d0c28e0c73 100644 --- a/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala +++ b/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala @@ -31,7 +31,7 @@ import com.comcast.ip4s._ import scala.concurrent.duration._ import scala.concurrent.TimeoutException -class SocketSuite extends Fs2IoSuite with SocketSuitePlatform { +class SocketSuite extends Fs2Suite with SocketSuitePlatform { val timeout = 30.seconds diff --git a/io/js-jvm/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala b/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala similarity index 100% rename from io/js-jvm/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala rename to io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala From adba4328380863765e2a7016411d61c001e35cce Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 00:46:34 +0000 Subject: [PATCH 17/36] Workaround another borked method --- io/native/src/main/scala/fs2/io/internal/syssocket.scala | 3 +++ .../scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/internal/syssocket.scala b/io/native/src/main/scala/fs2/io/internal/syssocket.scala index f8af29ee8e..984be7acb0 100644 --- a/io/native/src/main/scala/fs2/io/internal/syssocket.scala +++ b/io/native/src/main/scala/fs2/io/internal/syssocket.scala @@ -35,6 +35,9 @@ private[io] object syssocket { def bind(sockfd: CInt, addr: Ptr[sockaddr], addrlen: socklen_t): CInt = extern + def connect(sockfd: CInt, addr: Ptr[sockaddr], addrlen: socklen_t): CInt = + extern + def accept(sockfd: CInt, addr: Ptr[sockaddr], addrlen: Ptr[socklen_t]): CInt = extern diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala index a4fab6f097..0e24f7935c 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -41,7 +41,7 @@ import scala.scalanative.libc.errno._ import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ -import scala.scalanative.posix.sys.socket.{bind => _, accept => _, _} +import scala.scalanative.posix.sys.socket.{bind => _, connect => _, accept => _, _} import scala.scalanative.posix.unistd._ import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ From 0aa9260604d6efc9035facad986c59da2e4991c6 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 01:48:21 +0000 Subject: [PATCH 18/36] Bump base version --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index c532395f18..f4f45284ec 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._ Global / onChangedBuildSource := ReloadOnSourceChanges -ThisBuild / tlBaseVersion := "3.4" +ThisBuild / tlBaseVersion := "3.5" ThisBuild / organization := "co.fs2" ThisBuild / organizationName := "Functional Streams for Scala" From 64d7a761c3ce84932b299e7d2b18bd3b781ecf69 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 04:01:27 +0000 Subject: [PATCH 19/36] Implement non-blocking std{in,out,err} --- .../src/main/scala/fs2/io/iojvmnative.scala | 30 ---- io/jvm/src/main/scala/fs2/io/ioplatform.scala | 30 ++++ .../src/main/scala/fs2/io/ioplatform.scala | 129 ++++++++++++++++++ 3 files changed, 159 insertions(+), 30 deletions(-) diff --git a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala index 4066bafc78..8bcdeed359 100644 --- a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala +++ b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala @@ -22,46 +22,16 @@ package fs2 package io -import cats._ import cats.effect.kernel.Sync import cats.effect.kernel.implicits._ import cats.syntax.all._ -import java.nio.charset.Charset -import java.nio.charset.StandardCharsets import scala.reflect.ClassTag private[fs2] trait iojvmnative { type InterruptedIOException = java.io.InterruptedIOException type ClosedChannelException = java.nio.channels.ClosedChannelException - // - // STDIN/STDOUT Helpers - - /** Stream of bytes read asynchronously from standard input. */ - def stdin[F[_]: Sync](bufSize: Int): Stream[F, Byte] = - readInputStream(Sync[F].blocking(System.in), bufSize, false) - - /** Pipe of bytes that writes emitted values to standard output asynchronously. */ - def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] = - writeOutputStream(Sync[F].blocking(System.out), false) - - /** Pipe of bytes that writes emitted values to standard error asynchronously. */ - def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] = - writeOutputStream(Sync[F].blocking(System.err), false) - - /** Writes this stream to standard output asynchronously, converting each element to - * a sequence of bytes via `Show` and the given `Charset`. - */ - def stdoutLines[F[_]: Sync, O: Show]( - charset: Charset = StandardCharsets.UTF_8 - ): Pipe[F, O, Nothing] = - _.map(_.show).through(text.encode(charset)).through(stdout) - - /** Stream of `String` read asynchronously from standard input decoded in UTF-8. */ - def stdinUtf8[F[_]: Sync](bufSize: Int): Stream[F, String] = - stdin(bufSize).through(text.utf8.decode) - /** Stream of bytes read asynchronously from the specified resource relative to the class `C`. * @see [[readClassLoaderResource]] for a resource relative to a classloader. */ diff --git a/io/jvm/src/main/scala/fs2/io/ioplatform.scala b/io/jvm/src/main/scala/fs2/io/ioplatform.scala index 1bf51864aa..499e8d5b55 100644 --- a/io/jvm/src/main/scala/fs2/io/ioplatform.scala +++ b/io/jvm/src/main/scala/fs2/io/ioplatform.scala @@ -22,6 +22,7 @@ package fs2 package io +import cats.Show import cats.effect.kernel.{Async, Outcome, Resource, Sync} import cats.effect.kernel.implicits._ import cats.effect.kernel.Deferred @@ -29,9 +30,38 @@ import cats.syntax.all._ import fs2.io.internal.PipedStreamBuffer import java.io.{InputStream, OutputStream} +import java.nio.charset.Charset +import java.nio.charset.StandardCharsets private[fs2] trait ioplatform extends iojvmnative { + // + // STDIN/STDOUT Helpers + + /** Stream of bytes read asynchronously from standard input. */ + def stdin[F[_]: Sync](bufSize: Int): Stream[F, Byte] = + readInputStream(Sync[F].blocking(System.in), bufSize, false) + + /** Pipe of bytes that writes emitted values to standard output asynchronously. */ + def stdout[F[_]: Sync]: Pipe[F, Byte, Nothing] = + writeOutputStream(Sync[F].blocking(System.out), false) + + /** Pipe of bytes that writes emitted values to standard error asynchronously. */ + def stderr[F[_]: Sync]: Pipe[F, Byte, Nothing] = + writeOutputStream(Sync[F].blocking(System.err), false) + + /** Writes this stream to standard output asynchronously, converting each element to + * a sequence of bytes via `Show` and the given `Charset`. + */ + def stdoutLines[F[_]: Sync, O: Show]( + charset: Charset = StandardCharsets.UTF_8 + ): Pipe[F, O, Nothing] = + _.map(_.show).through(text.encode(charset)).through(stdout) + + /** Stream of `String` read asynchronously from standard input decoded in UTF-8. */ + def stdinUtf8[F[_]: Sync](bufSize: Int): Stream[F, String] = + stdin(bufSize).through(text.utf8.decode) + /** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`, * that is closed whenever the resulting stream terminates. * diff --git a/io/native/src/main/scala/fs2/io/ioplatform.scala b/io/native/src/main/scala/fs2/io/ioplatform.scala index 43c8807c99..66e96834b8 100644 --- a/io/native/src/main/scala/fs2/io/ioplatform.scala +++ b/io/native/src/main/scala/fs2/io/ioplatform.scala @@ -22,10 +22,23 @@ package fs2 package io +import cats.Show import cats.effect.FileDescriptorPoller import cats.effect.IO import cats.effect.LiftIO +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.effect.kernel.Sync import cats.syntax.all._ +import fs2.io.internal.NativeUtil._ + +import java.io.OutputStream +import java.nio.charset.Charset +import java.nio.charset.StandardCharsets +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.unistd._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ private[fs2] trait ioplatform extends iojvmnative { @@ -38,4 +51,120 @@ private[fs2] trait ioplatform extends iojvmnative { ) .to + // + // STDIN/STDOUT Helpers + + /** Stream of bytes read asynchronously from standard input. */ + def stdin[F[_]: Async: LiftIO](bufSize: Int): Stream[F, Byte] = + if (LinktimeInfo.isLinux || LinktimeInfo.isMac) + Stream + .resource { + Resource + .eval { + setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F] + } + .flatMap { poller => + poller.registerFileDescriptor(STDIN_FILENO, true, false).mapK(LiftIO.liftK) + } + } + .flatMap { handle => + Stream.repeatEval { + handle + .pollReadRec(()) { _ => + IO { + val buf = new Array[Byte](bufSize) + val readed = guard(read(STDIN_FILENO, buf.at(0), bufSize.toULong)) + if (readed > 0) + Right(Some(Chunk.array(buf, 0, readed))) + else if (readed == 0) + Right(None) + else + Left(()) + } + } + .to + } + } + .unNoneTerminate + .unchunks + else + readInputStream(Sync[F].blocking(System.in), bufSize, false) + + /** Pipe of bytes that writes emitted values to standard output asynchronously. */ + def stdout[F[_]: Async: LiftIO]: Pipe[F, Byte, Nothing] = + if (LinktimeInfo.isLinux || LinktimeInfo.isMac) + writeFd(STDOUT_FILENO) + else + writeOutputStream(Sync[F].blocking(System.out), false) + + /** Pipe of bytes that writes emitted values to standard error asynchronously. */ + def stderr[F[_]: Async: LiftIO]: Pipe[F, Byte, Nothing] = + if (LinktimeInfo.isLinux || LinktimeInfo.isMac) + writeFd(STDERR_FILENO) + else + writeOutputStream(Sync[F].blocking(System.err), false) + + private[this] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in => + Stream + .resource { + Resource + .eval { + setNonBlocking(fd) *> fileDescriptorPoller[F] + } + .flatMap { poller => + poller.registerFileDescriptor(fd, false, true).mapK(LiftIO.liftK) + } + } + .flatMap { handle => + in.chunks.foreach { bytes => + val Chunk.ArraySlice(buf, offset, length) = bytes.toArraySlice + + def go(pos: Int): IO[Either[Int, Unit]] = + IO(write(fd, buf.at(offset + pos), (length - pos).toULong)).flatMap { wrote => + if (wrote >= 0) { + val newPos = pos + wrote + if (newPos < length) + go(newPos) + else + IO.pure(Either.unit) + } else + IO.pure(Left(pos)) + } + + handle.pollWriteRec(0)(go(_)).to + } + } + + /** Writes this stream to standard output asynchronously, converting each element to + * a sequence of bytes via `Show` and the given `Charset`. + */ + def stdoutLines[F[_]: Async: LiftIO, O: Show]( + charset: Charset = StandardCharsets.UTF_8 + ): Pipe[F, O, Nothing] = + _.map(_.show).through(text.encode(charset)).through(stdout) + + /** Stream of `String` read asynchronously from standard input decoded in UTF-8. */ + def stdinUtf8[F[_]: Async: LiftIO](bufSize: Int): Stream[F, String] = + stdin(bufSize).through(text.utf8.decode) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stdin[F[_]](bufSize: Int, F: Sync[F]): Stream[F, Byte] = + readInputStream(F.blocking(System.in), bufSize, false)(F) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stdout[F[_]](F: Sync[F]): Pipe[F, Byte, Nothing] = + writeOutputStream(F.blocking(System.out: OutputStream), false)(F) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stderr[F[_]](F: Sync[F]): Pipe[F, Byte, Nothing] = + writeOutputStream(F.blocking(System.err: OutputStream), false)(F) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stdoutLines[F[_], O](charset: Charset, F: Sync[F], O: Show[O]): Pipe[F, O, Nothing] = + _.map(O.show(_)).through(text.encode(charset)).through(stdout(F)) + + @deprecated("Prefer non-blocking, async variant", "3.5.0") + def stdinUtf8[F[_]](bufSize: Int, F: Sync[F]): Stream[F, String] = + stdin(bufSize, F).through(text.utf8.decode) + } From c27e03d3989ae705d4fc1d42ed262843817f5097 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 04:27:20 +0000 Subject: [PATCH 20/36] Fix unix socket error handling --- .../scala/fs2/io/internal/SocketHelpers.scala | 17 +++++++++++++++++ .../net/unixsocket/FdPollingUnixSockets.scala | 4 ++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index b0521ea82b..4523709a0f 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -33,6 +33,7 @@ import com.comcast.ip4s.SocketAddress import java.io.IOException import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.arpa.inet._ +import scala.scalanative.posix.string._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.sys.socketOps._ import scala.scalanative.posix.unistd._ @@ -82,6 +83,22 @@ private[io] object SocketHelpers { ) } + def raiseSocketError[F[_]](fd: Int)(implicit F: Sync[F]): F[Unit] = F.delay { + val optval = stackalloc[CInt]() + val optlen = stackalloc[socklen_t]() + guard_ { + getsockopt( + fd, + SOL_SOCKET, + SO_ERROR, + optval.asInstanceOf[Ptr[Byte]], + optlen + ) + } + if (!optval != 0) + throw new IOException(fromCString(strerror(!optval))) + } + def getLocalAddress[F[_]](fd: Int)(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = SocketHelpers.toSocketAddress { (addr, len) => F.delay(guard_(getsockname(fd, addr, len))) diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala index 0e24f7935c..5587ce2bc4 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -57,12 +57,12 @@ private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[ toSockaddrUn(address.path).use { addr => handle .pollWriteRec(false) { connected => - if (connected) IO.pure(Either.unit) + if (connected) SocketHelpers.raiseSocketError[IO](fd).as(Either.unit) else IO { if (connect(fd, addr, sizeof[sockaddr_un].toUInt) < 0) { val e = errno - if (e == EINPROGRESS) + if (e == EAGAIN) Left(true) // we will be connected when we unblock else if (e == ECONNREFUSED) throw new ConnectException(fromCString(strerror(errno))) From ebc0ca52362289d28be487a8728e95db18550099 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 04:29:48 +0000 Subject: [PATCH 21/36] Simplify unix socket connect --- .../io/net/unixsocket/FdPollingUnixSockets.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala index 5587ce2bc4..939b8ef28b 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -37,9 +37,7 @@ import fs2.io.internal.syssocket._ import fs2.io.internal.sysun._ import fs2.io.internal.sysunOps._ -import scala.scalanative.libc.errno._ import scala.scalanative.meta.LinktimeInfo -import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.sys.socket.{bind => _, connect => _, accept => _, _} import scala.scalanative.posix.unistd._ @@ -60,16 +58,8 @@ private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[ if (connected) SocketHelpers.raiseSocketError[IO](fd).as(Either.unit) else IO { - if (connect(fd, addr, sizeof[sockaddr_un].toUInt) < 0) { - val e = errno - if (e == EAGAIN) - Left(true) // we will be connected when we unblock - else if (e == ECONNREFUSED) - throw new ConnectException(fromCString(strerror(errno))) - else - throw new IOException(fromCString(strerror(errno))) - } else - Either.unit + guard_(connect(fd, addr, sizeof[sockaddr_un].toUInt)) + Either.unit[Boolean] } } .to From 43b0c06a55e1863952ed331ffe6a7cf18c467257 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 04:58:09 +0000 Subject: [PATCH 22/36] Fix simplified unix socket connect --- .../scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala index 939b8ef28b..cb48186df0 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -58,8 +58,10 @@ private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[ if (connected) SocketHelpers.raiseSocketError[IO](fd).as(Either.unit) else IO { - guard_(connect(fd, addr, sizeof[sockaddr_un].toUInt)) - Either.unit[Boolean] + if (guard(connect(fd, addr, sizeof[sockaddr_un].toUInt)) < 0) + Left(true) // we will be connected when unblocked + else + Either.unit[Boolean] } } .to From 6e25eab62bff48af343e65c96d14f2c74342e60b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 05:05:36 +0000 Subject: [PATCH 23/36] stackalloc `sockaddr_un` ftw --- .../net/unixsocket/FdPollingUnixSockets.scala | 47 +++++++++---------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala index cb48186df0..36e6403b16 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -52,20 +52,20 @@ private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[ fd <- SocketHelpers.openNonBlocking(AF_UNIX, SOCK_STREAM) handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) _ <- Resource.eval { - toSockaddrUn(address.path).use { addr => - handle - .pollWriteRec(false) { connected => - if (connected) SocketHelpers.raiseSocketError[IO](fd).as(Either.unit) - else - IO { + handle + .pollWriteRec(false) { connected => + if (connected) SocketHelpers.raiseSocketError[IO](fd).as(Either.unit) + else + IO { + toSockaddrUn(address.path) { addr => if (guard(connect(fd, addr, sizeof[sockaddr_un].toUInt)) < 0) Left(true) // we will be connected when unblocked else Either.unit[Boolean] } - } - .to - } + } + } + .to } socket <- FdPollingSocket[F](fd, handle, raiseIpAddressError, raiseIpAddressError) } yield socket @@ -85,8 +85,8 @@ private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[ handle <- Stream.resource(poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK)) _ <- Stream.eval { - toSockaddrUn(address.path).use { addr => - F.delay(guard_(bind(fd, addr, sizeof[sockaddr_un].toUInt))) + F.delay { + toSockaddrUn(address.path)(addr => guard_(bind(fd, addr, sizeof[sockaddr_un].toUInt))) } *> F.delay(guard_(listen(fd, 0))) } @@ -129,20 +129,17 @@ private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[ } yield socket - private def toSockaddrUn(path: String): Resource[F, Ptr[sockaddr]] = - Resource.make(F.delay(Zone.open()))(z => F.delay(z.close())).evalMap[Ptr[sockaddr]] { - implicit z => - val pathBytes = path.getBytes - if (pathBytes.length > 107) - F.raiseError(new IllegalArgumentException(s"Path too long: $path")) - else - F.delay { - val addr = alloc[sockaddr_un]() - addr.sun_family = AF_UNIX.toUShort - memcpy(addr.sun_path.at(0), pathBytes.at(0), pathBytes.length.toULong) - addr.asInstanceOf[Ptr[sockaddr]] - } - } + private def toSockaddrUn[A](path: String)(f: Ptr[sockaddr] => A): A = { + val pathBytes = path.getBytes + if (pathBytes.length > 107) + throw new IllegalArgumentException(s"Path too long: $path") + + val addr = stackalloc[sockaddr_un]() + addr.sun_family = AF_UNIX.toUShort + memcpy(addr.sun_path.at(0), pathBytes.at(0), pathBytes.length.toULong) + + f(addr.asInstanceOf[Ptr[sockaddr]]) + } private def raiseIpAddressError[A]: F[A] = F.raiseError(new UnsupportedOperationException("UnixSockets do not use IP addressing")) From 1cf0db4e4860408fa79c73298781b26a6546a617 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 05:31:16 +0000 Subject: [PATCH 24/36] Add more socket option helpers --- .../scala/fs2/io/internal/SocketHelpers.scala | 76 +++++++++++++++---- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index 4523709a0f..5149b14369 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -31,8 +31,12 @@ import com.comcast.ip4s.Port import com.comcast.ip4s.SocketAddress import java.io.IOException +import java.net.SocketOption +import java.net.StandardSocketOptions import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.arpa.inet._ +import scala.scalanative.posix.netinet.in.IPPROTO_TCP +import scala.scalanative.posix.netinet.tcp._ import scala.scalanative.posix.string._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.posix.sys.socketOps._ @@ -68,14 +72,70 @@ private[io] object SocketHelpers { def setNoSigPipe[F[_]: Sync](fd: CInt): F[Unit] = setOption(fd, SO_NOSIGPIPE, true) + def setOption[F[_]: Sync, T](fd: CInt, name: SocketOption[T], value: T): F[Unit] = name match { + case StandardSocketOptions.SO_SNDBUF => + setOption( + fd, + SO_SNDBUF, + value.asInstanceOf[java.lang.Integer] + ) + case StandardSocketOptions.SO_RCVBUF => + setOption( + fd, + SO_RCVBUF, + value.asInstanceOf[java.lang.Integer] + ) + case StandardSocketOptions.SO_REUSEADDR => + setOption( + fd, + SO_REUSEADDR, + value.asInstanceOf[java.lang.Boolean] + ) + case StandardSocketOptions.SO_REUSEPORT => + SocketHelpers.setOption( + fd, + SO_REUSEPORT, + value.asInstanceOf[java.lang.Boolean] + ) + case StandardSocketOptions.SO_KEEPALIVE => + SocketHelpers.setOption( + fd, + SO_KEEPALIVE, + value.asInstanceOf[java.lang.Boolean] + ) + case StandardSocketOptions.TCP_NODELAY => + setTcpOption( + fd, + TCP_NODELAY, + value.asInstanceOf[java.lang.Boolean] + ) + case _ => throw new IllegalArgumentException + } + def setOption[F[_]](fd: CInt, option: CInt, value: Boolean)(implicit F: Sync[F]): F[Unit] = + setOptionImpl(fd, SOL_SOCKET, option, if (value) 1 else 0) + + def setOption[F[_]](fd: CInt, option: CInt, value: CInt)(implicit F: Sync[F]): F[Unit] = + setOptionImpl(fd, SOL_SOCKET, option, value) + + def setTcpOption[F[_]](fd: CInt, option: CInt, value: Boolean)(implicit F: Sync[F]): F[Unit] = + setOptionImpl( + fd, + IPPROTO_TCP, // aka SOL_TCP + option, + if (value) 1 else 0 + ) + + def setOptionImpl[F[_]](fd: CInt, level: CInt, option: CInt, value: CInt)(implicit + F: Sync[F] + ): F[Unit] = F.delay { val ptr = stackalloc[CInt]() - !ptr = if (value.asInstanceOf[java.lang.Boolean]) 1 else 0 + !ptr = value guard_( setsockopt( fd, - SOL_SOCKET, + level, option, ptr.asInstanceOf[Ptr[Byte]], sizeof[CInt].toUInt @@ -104,18 +164,6 @@ private[io] object SocketHelpers { F.delay(guard_(getsockname(fd, addr, len))) } - def allocateSockaddr[F[_]](implicit F: Sync[F]): Resource[F, (Ptr[sockaddr], Ptr[socklen_t])] = - Resource - .make(F.delay(Zone.open()))(z => F.delay(z.close())) - .evalMap { implicit z => - F.delay { - val addr = // allocate enough for an IPv6 - alloc[sockaddr_in6]().asInstanceOf[Ptr[sockaddr]] - val len = alloc[socklen_t]() - (addr, len) - } - } - def toSockaddr[A]( address: SocketAddress[IpAddress] )(f: (Ptr[sockaddr], socklen_t) => A): A = From 74b80f90ae8197fdb28e42260d2169b0a8c488c7 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 06:45:29 +0000 Subject: [PATCH 25/36] `accept4` is a linux thing --- .../scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala index 36e6403b16..7802f5b81c 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -100,9 +100,9 @@ private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[ IO { val clientFd = if (LinktimeInfo.isLinux) - guard(accept(fd, null, null)) - else guard(accept4(fd, null, null, SOCK_NONBLOCK)) + else + guard(accept(fd, null, null)) if (clientFd >= 0) Right(clientFd) From 84fc9b1b5783e1c6335baeeaf2e837edb67efafa Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 07:31:12 +0000 Subject: [PATCH 26/36] First attempt at fd polling socket group --- .../scala/fs2/io/internal/SocketHelpers.scala | 29 ++-- .../fs2/io/net/FdPollingSocketGroup.scala | 153 ++++++++++++++++++ 2 files changed, 172 insertions(+), 10 deletions(-) create mode 100644 io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index 5149b14369..85c70e6383 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -160,8 +160,10 @@ private[io] object SocketHelpers { } def getLocalAddress[F[_]](fd: Int)(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = - SocketHelpers.toSocketAddress { (addr, len) => - F.delay(guard_(getsockname(fd, addr, len))) + F.delay { + SocketHelpers.toSocketAddress { (addr, len) => + guard_(getsockname(fd, addr, len)) + } } def toSockaddr[A]( @@ -249,24 +251,31 @@ private[io] object SocketHelpers { } } - def toSocketAddress[F[_]]( - f: (Ptr[sockaddr], Ptr[socklen_t]) => F[Unit] - )(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = { + def allocateSockaddr[A]( + f: (Ptr[sockaddr], Ptr[socklen_t]) => A + ): A = { val addr = // allocate enough for an IPv6 stackalloc[sockaddr_in6]().asInstanceOf[Ptr[sockaddr]] val len = stackalloc[socklen_t]() !len = sizeof[sockaddr_in6].toUInt - f(addr, len) *> toSocketAddress(addr) + f(addr, len) + } + + def toSocketAddress[A]( + f: (Ptr[sockaddr], Ptr[socklen_t]) => Unit + ): SocketAddress[IpAddress] = allocateSockaddr { (addr, len) => + f(addr, len) + toSocketAddress(addr) } - def toSocketAddress[F[_]](addr: Ptr[sockaddr])(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = + def toSocketAddress(addr: Ptr[sockaddr]): SocketAddress[IpAddress] = if (addr.sa_family.toInt == AF_INET) - F.pure(toIpv4SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in]])) + toIpv4SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in]]) else if (addr.sa_family.toInt == AF_INET6) - F.pure(toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]])) + toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]]) else - F.raiseError(new IOException(s"Unsupported sa_family: ${addr.sa_family}")) + throw new IOException(s"Unsupported sa_family: ${addr.sa_family}") private[this] def toIpv4SocketAddress(addr: Ptr[sockaddr_in]): SocketAddress[Ipv4Address] = { val port = Port.fromInt(ntohs(addr.sin_port).toInt).get diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala new file mode 100644 index 0000000000..7496395c98 --- /dev/null +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package net + +import cats.effect.IO +import cats.effect.LiftIO +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.syntax.all._ +import com.comcast.ip4s._ +import fs2.io.internal.NativeUtil._ +import fs2.io.internal.SocketHelpers +import fs2.io.internal.syssocket._ + +import scala.scalanative.libc.errno._ +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.sys.socket.{bind => _, connect => _, accept => _, _} +import scala.scalanative.posix.unistd._ +import scala.scalanative.unsafe._ + +private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F]) + extends SocketGroup[F] { + + def client(to: SocketAddress[Host], options: List[SocketOption]): Resource[F, Socket[F]] = for { + poller <- Resource.eval(fileDescriptorPoller[F]) + address <- Resource.eval(to.resolve) + ipv4 = address.host.isInstanceOf[Ipv4Address] + fd <- SocketHelpers.openNonBlocking(if (ipv4) AF_INET else AF_INET6, SOCK_STREAM) + _ <- Resource.eval(options.traverse(so => SocketHelpers.setOption(fd, so.key, so.value))) + handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) + _ <- Resource.eval { + handle + .pollWriteRec(false) { connected => + if (connected) SocketHelpers.raiseSocketError[IO](fd).as(Either.unit) + else + IO { + SocketHelpers.toSockaddr(address) { (addr, len) => + if (connect(fd, addr, len) < 0) { + val e = errno + if (e == EINPROGRESS) + Left(true) // we will be connected when we unblock + else if (e == ECONNREFUSED) + throw new ConnectException(fromCString(strerror(errno))) + else + throw new IOException(fromCString(strerror(errno))) + } else + Either.unit + } + } + } + .to + } + socket <- FdPollingSocket[F](fd, handle, SocketHelpers.getLocalAddress(fd), F.pure(address)) + } yield socket + + def server( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Stream[F, Socket[F]] = + Stream.resource(serverResource(address, port, options)).flatMap(_._2) + + def serverResource( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = for { + poller <- Resource.eval(fileDescriptorPoller[F]) + address <- Resource.eval(address.fold(IpAddress.loopback)(_.resolve)) + ipv4 = address.isInstanceOf[Ipv4Address] + fd <- SocketHelpers.openNonBlocking(if (ipv4) AF_INET else AF_INET6, SOCK_STREAM) + handle <- poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK) + _ <- Resource.eval { + F.delay { + val socketAddress = SocketAddress(address, port.getOrElse(port"0")) + SocketHelpers.toSockaddr(socketAddress) { (addr, len) => + guard_(bind(fd, addr, len)) + } + } *> F.delay(guard_(listen(fd, 0))) + } + + sockets = Stream + .resource { + val accepted = for { + addrFd <- Resource.makeFull[F, (SocketAddress[IpAddress], Int)] { poll => + poll { + handle + .pollReadRec(()) { _ => + IO { + SocketHelpers.allocateSockaddr { (addr, len) => + val clientFd = + if (LinktimeInfo.isLinux) + guard(accept4(fd, addr, len, SOCK_NONBLOCK)) + else + guard(accept(fd, addr, len)) + + if (clientFd >= 0) { + val address = SocketHelpers.toSocketAddress(addr) + Right((address, clientFd)) + } else + Left(()) + } + } + } + .to + } + }(addrFd => F.delay(guard_(close(addrFd._2)))) + (address, fd) = addrFd + _ <- + if (!LinktimeInfo.isLinux) + Resource.eval(setNonBlocking(fd)) + else Resource.unit[F] + handle <- poller.registerFileDescriptor(fd, true, true).mapK(LiftIO.liftK) + socket <- FdPollingSocket[F]( + fd, + handle, + SocketHelpers.getLocalAddress(fd), + F.pure(address) + ) + } yield socket + + accepted.attempt.map(_.toOption) + } + .repeat + .unNone + + serverAddress <- Resource.eval(SocketHelpers.getLocalAddress(fd)) + } yield (serverAddress, sockets) + +} From 1678d527dafec9be2f7a61c7275541cb20a422ec Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 07:47:30 +0000 Subject: [PATCH 27/36] `raiseSocketError` -> `checkSocketError` --- io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala | 2 +- io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala | 2 +- .../main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index 85c70e6383..8ebeda78bf 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -143,7 +143,7 @@ private[io] object SocketHelpers { ) } - def raiseSocketError[F[_]](fd: Int)(implicit F: Sync[F]): F[Unit] = F.delay { + def checkSocketError[F[_]](fd: Int)(implicit F: Sync[F]): F[Unit] = F.delay { val optval = stackalloc[CInt]() val optlen = stackalloc[socklen_t]() guard_ { diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala index 7496395c98..85591db730 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala @@ -54,7 +54,7 @@ private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F] _ <- Resource.eval { handle .pollWriteRec(false) { connected => - if (connected) SocketHelpers.raiseSocketError[IO](fd).as(Either.unit) + if (connected) SocketHelpers.checkSocketError[IO](fd).as(Either.unit) else IO { SocketHelpers.toSockaddr(address) { (addr, len) => diff --git a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala index 7802f5b81c..a5dbd0858a 100644 --- a/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala +++ b/io/native/src/main/scala/fs2/io/net/unixsocket/FdPollingUnixSockets.scala @@ -54,7 +54,7 @@ private final class FdPollingUnixSockets[F[_]: Files: LiftIO](implicit F: Async[ _ <- Resource.eval { handle .pollWriteRec(false) { connected => - if (connected) SocketHelpers.raiseSocketError[IO](fd).as(Either.unit) + if (connected) SocketHelpers.checkSocketError[IO](fd).as(Either.unit) else IO { toSockaddrUn(address.path) { addr => From b5b3a049ed64397ae5fca02a0f48d93c231cd64b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 07:55:20 +0000 Subject: [PATCH 28/36] Expose new polling system `Network` --- .../scala/fs2/io/net/NetworkPlatform.scala | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/net/NetworkPlatform.scala b/io/native/src/main/scala/fs2/io/net/NetworkPlatform.scala index f2aba3b524..7536484da5 100644 --- a/io/native/src/main/scala/fs2/io/net/NetworkPlatform.scala +++ b/io/native/src/main/scala/fs2/io/net/NetworkPlatform.scala @@ -23,9 +23,10 @@ package fs2 package io package net +import cats.effect.LiftIO import cats.effect.kernel.{Async, Resource} -import com.comcast.ip4s.{Host, IpAddress, Port, SocketAddress} +import com.comcast.ip4s.{Dns, Host, IpAddress, Port, SocketAddress} import fs2.io.net.tls.TLSContext @@ -33,9 +34,9 @@ private[net] trait NetworkPlatform[F[_]] private[net] trait NetworkCompanionPlatform { self: Network.type => - implicit def forAsync[F[_]](implicit F: Async[F]): Network[F] = + implicit def forAsync[F[_]: Async: Dns: LiftIO]: Network[F] = new UnsealedNetwork[F] { - private lazy val globalSocketGroup = SocketGroup.unsafe[F](null) + private lazy val globalSocketGroup = new FdPollingSocketGroup[F] def client( to: SocketAddress[Host], @@ -58,4 +59,30 @@ private[net] trait NetworkCompanionPlatform { self: Network.type => def tlsContext: TLSContext.Builder[F] = TLSContext.Builder.forAsync } + @deprecated("Prefer the IO polling system-based implementation", "3.5.0") + def forAsync[F[_]](F: Async[F]): Network[F] = + new UnsealedNetwork[F] { + private lazy val globalSocketGroup = SocketGroup.unsafe[F](null)(F) + + def client( + to: SocketAddress[Host], + options: List[SocketOption] + ): Resource[F, Socket[F]] = globalSocketGroup.client(to, options) + + def server( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Stream[F, Socket[F]] = globalSocketGroup.server(address, port, options) + + def serverResource( + address: Option[Host], + port: Option[Port], + options: List[SocketOption] + ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = + globalSocketGroup.serverResource(address, port, options) + + def tlsContext: TLSContext.Builder[F] = TLSContext.Builder.forAsync(F) + } + } From e682a3777365a8d5a406007b8b87e4b0d792f297 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 08:05:47 +0000 Subject: [PATCH 29/36] Fix exceptions, tweak test --- .../scala/fs2/io/internal/NativeUtil.scala | 19 ++++++++++++++----- .../scala/fs2/io/net/tcp/SocketSuite.scala | 4 ++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala index 80a0a89446..8887e8409c 100644 --- a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala +++ b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala @@ -23,13 +23,15 @@ package fs2.io.internal import cats.effect.Sync +import java.io.IOException +import java.net.BindException +import java.net.ConnectException import scala.scalanative.annotation.alwaysinline import scala.scalanative.libc.errno._ import scala.scalanative.posix.fcntl._ import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ import scala.scalanative.unsafe._ -import java.io.IOException private[io] object NativeUtil { @@ -41,11 +43,18 @@ private[io] object NativeUtil { @alwaysinline def guard(thunk: => CInt): CInt = { val rtn = thunk if (rtn < 0) { - val en = errno - if (en == EAGAIN || en == EWOULDBLOCK) + val e = errno + if (e == EAGAIN || e == EWOULDBLOCK) rtn - else - throw new IOException(fromCString(strerror(errno))) + else { + val msg = fromCString(strerror(e)) + if (e == EADDRINUSE /* || e == EADDRNOTAVAIL */ ) + throw new BindException(msg) + else if (e == ECONNREFUSED) + throw new ConnectException(msg) + else + throw new IOException(msg) + } } else rtn } diff --git a/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala b/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala index d0c28e0c73..fa240abd60 100644 --- a/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala +++ b/io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala @@ -218,7 +218,7 @@ class SocketSuite extends Fs2Suite with SocketSuitePlatform { } } - test("read after timed out read not allowed on JVM or Native") { + test("read after timed out read not allowed on JVM") { val setup = for { serverSetup <- Network[IO].serverResource(Some(ip"127.0.0.1")) (bindAddress, server) = serverSetup @@ -239,7 +239,7 @@ class SocketSuite extends Fs2Suite with SocketSuitePlatform { client .readN(msg.size) .flatMap { c => - if (isJVM || isNative) { + if (isJVM) { assertEquals(c.size, 0) // Read again now that the pending read is no longer pending client.readN(msg.size).map(c => assertEquals(c.size, 0)) From 220f6bcdb746d00131b6e77b942bb1d3d9baa170 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 17:12:10 +0000 Subject: [PATCH 30/36] Add forgotten `guard`s --- .../scala/fs2/io/internal/NativeUtil.scala | 31 +++++++++++++------ .../scala/fs2/io/net/FdPollingSocket.scala | 4 +-- .../fs2/io/net/FdPollingSocketGroup.scala | 6 +--- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala index 8887e8409c..00850c5660 100644 --- a/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala +++ b/io/native/src/main/scala/fs2/io/internal/NativeUtil.scala @@ -46,19 +46,32 @@ private[io] object NativeUtil { val e = errno if (e == EAGAIN || e == EWOULDBLOCK) rtn - else { - val msg = fromCString(strerror(e)) - if (e == EADDRINUSE /* || e == EADDRNOTAVAIL */ ) - throw new BindException(msg) - else if (e == ECONNREFUSED) - throw new ConnectException(msg) - else - throw new IOException(msg) - } + else throw errnoToThrowable(e) } else rtn } + @alwaysinline def guardSSize(thunk: => CSSize): CSSize = { + val rtn = thunk + if (rtn < 0) { + val e = errno + if (e == EAGAIN || e == EWOULDBLOCK) + rtn + else throw errnoToThrowable(e) + } else + rtn + } + + @alwaysinline def errnoToThrowable(e: CInt): Throwable = { + val msg = fromCString(strerror(e)) + if (e == EADDRINUSE /* || e == EADDRNOTAVAIL */ ) + new BindException(msg) + else if (e == ECONNREFUSED) + new ConnectException(msg) + else + new IOException(msg) + } + def setNonBlocking[F[_]](fd: CInt)(implicit F: Sync[F]): F[Unit] = F.delay { guard_(fcntl(fd, F_SETFL, O_NONBLOCK)) } diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala index 1fb4e7a71e..821268de46 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocket.scala @@ -94,9 +94,9 @@ private final class FdPollingSocket[F[_]: LiftIO] private ( def go(pos: Int): IO[Either[Int, Unit]] = IO { if (LinktimeInfo.isLinux) - send(fd, buf.at(offset + pos), (length - pos).toULong, MSG_NOSIGNAL).toInt + guardSSize(send(fd, buf.at(offset + pos), (length - pos).toULong, MSG_NOSIGNAL)).toInt else - unistd.write(fd, buf.at(offset + pos), (length - pos).toULong) + guard(unistd.write(fd, buf.at(offset + pos), (length - pos).toULong)) }.flatMap { wrote => if (wrote >= 0) { val newPos = pos + wrote diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala index 85591db730..aa9a947276 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala @@ -36,10 +36,8 @@ import fs2.io.internal.syssocket._ import scala.scalanative.libc.errno._ import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.errno._ -import scala.scalanative.posix.string._ import scala.scalanative.posix.sys.socket.{bind => _, connect => _, accept => _, _} import scala.scalanative.posix.unistd._ -import scala.scalanative.unsafe._ private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F]) extends SocketGroup[F] { @@ -62,10 +60,8 @@ private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F] val e = errno if (e == EINPROGRESS) Left(true) // we will be connected when we unblock - else if (e == ECONNREFUSED) - throw new ConnectException(fromCString(strerror(errno))) else - throw new IOException(fromCString(strerror(errno))) + throw errnoToThrowable(e) } else Either.unit } From ef299db90ad01ae40f643c96a5bcb695439c9e14 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 19:37:00 +0000 Subject: [PATCH 31/36] Workaround BSD `sa_family` quirk --- .../scala/fs2/io/internal/SocketHelpers.scala | 13 ++++++-- .../main/scala/fs2/io/internal/netinet.scala | 31 ++++++++++++++----- .../scala/fs2/io/internal/syssocket.scala | 5 +++ 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index 8ebeda78bf..1fd40ce28c 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -269,13 +269,20 @@ private[io] object SocketHelpers { toSocketAddress(addr) } - def toSocketAddress(addr: Ptr[sockaddr]): SocketAddress[IpAddress] = - if (addr.sa_family.toInt == AF_INET) + def toSocketAddress(addr: Ptr[sockaddr]): SocketAddress[IpAddress] = { + val sa_family = + if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) + addr.sa_family.asInstanceOf[bsd_len_family]._2.toInt + else + addr.sa_family.toInt + + if (sa_family == AF_INET) toIpv4SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in]]) - else if (addr.sa_family.toInt == AF_INET6) + else if (sa_family == AF_INET6) toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]]) else throw new IOException(s"Unsupported sa_family: ${addr.sa_family}") + } private[this] def toIpv4SocketAddress(addr: Ptr[sockaddr_in]): SocketAddress[Ipv4Address] = { val port = Port.fromInt(ntohs(addr.sin_port).toInt).get diff --git a/io/native/src/main/scala/fs2/io/internal/netinet.scala b/io/native/src/main/scala/fs2/io/internal/netinet.scala index fdb1dc4afc..29b07f9990 100644 --- a/io/native/src/main/scala/fs2/io/internal/netinet.scala +++ b/io/native/src/main/scala/fs2/io/internal/netinet.scala @@ -21,9 +21,12 @@ package fs2.io.internal -import scalanative.unsafe._ -import scalanative.posix.inttypes._ -import scalanative.posix.sys.socket._ +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.inttypes._ +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.unsafe._ + +import syssocket.bsd_len_family private[io] object netinetin { import Nat._ @@ -61,8 +64,16 @@ private[io] object netinetinOps { } implicit final class sockaddr_inOps(val sockaddr_in: Ptr[sockaddr_in]) extends AnyVal { - def sin_family: sa_family_t = sockaddr_in._1 - def sin_family_=(sin_family: sa_family_t): Unit = sockaddr_in._1 = sin_family + def sin_family: sa_family_t = + if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) + sockaddr_in.at1.asInstanceOf[bsd_len_family]._2 + else + sockaddr_in._1 + def sin_family_=(sin_family: sa_family_t): Unit = + if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) + sockaddr_in.at1.asInstanceOf[bsd_len_family]._2 = sin_family.toUByte + else + sockaddr_in._1 = sin_family def sin_port: in_port_t = sockaddr_in._2 def sin_port_=(sin_port: in_port_t): Unit = sockaddr_in._2 = sin_port def sin_addr: in_addr = sockaddr_in._3 @@ -75,8 +86,14 @@ private[io] object netinetinOps { } implicit final class sockaddr_in6Ops(val sockaddr_in6: Ptr[sockaddr_in6]) extends AnyVal { - def sin6_family: sa_family_t = sockaddr_in6._1 - def sin6_family_=(sin6_family: sa_family_t): Unit = sockaddr_in6._1 = sin6_family + def sin6_family: sa_family_t = if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) + sockaddr_in6.asInstanceOf[bsd_len_family]._2 + else + sockaddr_in6._1 + def sin6_family_=(sin6_family: sa_family_t): Unit = + if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) + sockaddr_in6.at1.asInstanceOf[bsd_len_family]._2 = sin6_family.toUByte + else sockaddr_in6._1 = sin6_family def sin6_port: in_port_t = sockaddr_in6._2 def sin6_port_=(sin6_port: in_port_t): Unit = sockaddr_in6._2 = sin6_port def sin6_flowinfo: uint32_t = sockaddr_in6._3 diff --git a/io/native/src/main/scala/fs2/io/internal/syssocket.scala b/io/native/src/main/scala/fs2/io/internal/syssocket.scala index 984be7acb0..631d4f69b1 100644 --- a/io/native/src/main/scala/fs2/io/internal/syssocket.scala +++ b/io/native/src/main/scala/fs2/io/internal/syssocket.scala @@ -21,11 +21,16 @@ package fs2.io.internal +import scala.scalanative.posix.inttypes._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.unsafe._ +import syssocket._ + @extern private[io] object syssocket { + type bsd_len_family = CStruct2[uint8_t, uint8_t] + // only in Linux and FreeBSD, but not macOS final val SOCK_NONBLOCK = 2048 From f6ecd2b1c3021bce175dd24e41e6c62a9f0af671 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 19:40:58 +0000 Subject: [PATCH 32/36] Unused import --- io/native/src/main/scala/fs2/io/internal/syssocket.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/syssocket.scala b/io/native/src/main/scala/fs2/io/internal/syssocket.scala index 631d4f69b1..46b4782d85 100644 --- a/io/native/src/main/scala/fs2/io/internal/syssocket.scala +++ b/io/native/src/main/scala/fs2/io/internal/syssocket.scala @@ -25,8 +25,6 @@ import scala.scalanative.posix.inttypes._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.unsafe._ -import syssocket._ - @extern private[io] object syssocket { type bsd_len_family = CStruct2[uint8_t, uint8_t] From 830564afb39a8db9e261c98622b75d2e3def3b6a Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 15:23:11 -0500 Subject: [PATCH 33/36] Fixing+debugging --- .../src/main/scala/fs2/io/internal/SocketHelpers.scala | 9 ++------- io/native/src/main/scala/fs2/io/internal/netinet.scala | 8 ++++---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index 1fd40ce28c..e5b7e04f08 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -270,18 +270,13 @@ private[io] object SocketHelpers { } def toSocketAddress(addr: Ptr[sockaddr]): SocketAddress[IpAddress] = { - val sa_family = - if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - addr.sa_family.asInstanceOf[bsd_len_family]._2.toInt - else - addr.sa_family.toInt - + val sa_family = addr.sa_family.toInt if (sa_family == AF_INET) toIpv4SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in]]) else if (sa_family == AF_INET6) toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]]) else - throw new IOException(s"Unsupported sa_family: ${addr.sa_family}") + throw new IOException(s"Unsupported sa_family: $sa_family") } private[this] def toIpv4SocketAddress(addr: Ptr[sockaddr_in]): SocketAddress[Ipv4Address] = { diff --git a/io/native/src/main/scala/fs2/io/internal/netinet.scala b/io/native/src/main/scala/fs2/io/internal/netinet.scala index 29b07f9990..3f24773444 100644 --- a/io/native/src/main/scala/fs2/io/internal/netinet.scala +++ b/io/native/src/main/scala/fs2/io/internal/netinet.scala @@ -66,12 +66,12 @@ private[io] object netinetinOps { implicit final class sockaddr_inOps(val sockaddr_in: Ptr[sockaddr_in]) extends AnyVal { def sin_family: sa_family_t = if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - sockaddr_in.at1.asInstanceOf[bsd_len_family]._2 + sockaddr_in.at1.asInstanceOf[Ptr[bsd_len_family]]._2 else sockaddr_in._1 def sin_family_=(sin_family: sa_family_t): Unit = if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - sockaddr_in.at1.asInstanceOf[bsd_len_family]._2 = sin_family.toUByte + sockaddr_in.at1.asInstanceOf[Ptr[bsd_len_family]]._2 = sin_family.toUByte else sockaddr_in._1 = sin_family def sin_port: in_port_t = sockaddr_in._2 @@ -87,12 +87,12 @@ private[io] object netinetinOps { implicit final class sockaddr_in6Ops(val sockaddr_in6: Ptr[sockaddr_in6]) extends AnyVal { def sin6_family: sa_family_t = if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - sockaddr_in6.asInstanceOf[bsd_len_family]._2 + sockaddr_in6.asInstanceOf[Ptr[bsd_len_family]]._2 else sockaddr_in6._1 def sin6_family_=(sin6_family: sa_family_t): Unit = if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - sockaddr_in6.at1.asInstanceOf[bsd_len_family]._2 = sin6_family.toUByte + sockaddr_in6.at1.asInstanceOf[Ptr[bsd_len_family]]._2 = sin6_family.toUByte else sockaddr_in6._1 = sin6_family def sin6_port: in_port_t = sockaddr_in6._2 def sin6_port_=(sin6_port: in_port_t): Unit = sockaddr_in6._2 = sin6_port From 0ece5fc809346506054ddba059287621853b3b76 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 22 Dec 2022 22:53:57 +0000 Subject: [PATCH 34/36] Revert "Workaround BSD `sa_family` quirk" This reverts commit ef299db90ad01ae40f643c96a5bcb695439c9e14. --- .../scala/fs2/io/internal/SocketHelpers.scala | 10 +++--- .../main/scala/fs2/io/internal/netinet.scala | 31 +++++-------------- .../scala/fs2/io/internal/syssocket.scala | 3 -- 3 files changed, 11 insertions(+), 33 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index e5b7e04f08..8ebeda78bf 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -269,15 +269,13 @@ private[io] object SocketHelpers { toSocketAddress(addr) } - def toSocketAddress(addr: Ptr[sockaddr]): SocketAddress[IpAddress] = { - val sa_family = addr.sa_family.toInt - if (sa_family == AF_INET) + def toSocketAddress(addr: Ptr[sockaddr]): SocketAddress[IpAddress] = + if (addr.sa_family.toInt == AF_INET) toIpv4SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in]]) - else if (sa_family == AF_INET6) + else if (addr.sa_family.toInt == AF_INET6) toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]]) else - throw new IOException(s"Unsupported sa_family: $sa_family") - } + throw new IOException(s"Unsupported sa_family: ${addr.sa_family}") private[this] def toIpv4SocketAddress(addr: Ptr[sockaddr_in]): SocketAddress[Ipv4Address] = { val port = Port.fromInt(ntohs(addr.sin_port).toInt).get diff --git a/io/native/src/main/scala/fs2/io/internal/netinet.scala b/io/native/src/main/scala/fs2/io/internal/netinet.scala index 3f24773444..fdb1dc4afc 100644 --- a/io/native/src/main/scala/fs2/io/internal/netinet.scala +++ b/io/native/src/main/scala/fs2/io/internal/netinet.scala @@ -21,12 +21,9 @@ package fs2.io.internal -import scala.scalanative.meta.LinktimeInfo -import scala.scalanative.posix.inttypes._ -import scala.scalanative.posix.sys.socket._ -import scala.scalanative.unsafe._ - -import syssocket.bsd_len_family +import scalanative.unsafe._ +import scalanative.posix.inttypes._ +import scalanative.posix.sys.socket._ private[io] object netinetin { import Nat._ @@ -64,16 +61,8 @@ private[io] object netinetinOps { } implicit final class sockaddr_inOps(val sockaddr_in: Ptr[sockaddr_in]) extends AnyVal { - def sin_family: sa_family_t = - if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - sockaddr_in.at1.asInstanceOf[Ptr[bsd_len_family]]._2 - else - sockaddr_in._1 - def sin_family_=(sin_family: sa_family_t): Unit = - if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - sockaddr_in.at1.asInstanceOf[Ptr[bsd_len_family]]._2 = sin_family.toUByte - else - sockaddr_in._1 = sin_family + def sin_family: sa_family_t = sockaddr_in._1 + def sin_family_=(sin_family: sa_family_t): Unit = sockaddr_in._1 = sin_family def sin_port: in_port_t = sockaddr_in._2 def sin_port_=(sin_port: in_port_t): Unit = sockaddr_in._2 = sin_port def sin_addr: in_addr = sockaddr_in._3 @@ -86,14 +75,8 @@ private[io] object netinetinOps { } implicit final class sockaddr_in6Ops(val sockaddr_in6: Ptr[sockaddr_in6]) extends AnyVal { - def sin6_family: sa_family_t = if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - sockaddr_in6.asInstanceOf[Ptr[bsd_len_family]]._2 - else - sockaddr_in6._1 - def sin6_family_=(sin6_family: sa_family_t): Unit = - if (LinktimeInfo.isMac || LinktimeInfo.isFreeBSD) - sockaddr_in6.at1.asInstanceOf[Ptr[bsd_len_family]]._2 = sin6_family.toUByte - else sockaddr_in6._1 = sin6_family + def sin6_family: sa_family_t = sockaddr_in6._1 + def sin6_family_=(sin6_family: sa_family_t): Unit = sockaddr_in6._1 = sin6_family def sin6_port: in_port_t = sockaddr_in6._2 def sin6_port_=(sin6_port: in_port_t): Unit = sockaddr_in6._2 = sin6_port def sin6_flowinfo: uint32_t = sockaddr_in6._3 diff --git a/io/native/src/main/scala/fs2/io/internal/syssocket.scala b/io/native/src/main/scala/fs2/io/internal/syssocket.scala index 46b4782d85..984be7acb0 100644 --- a/io/native/src/main/scala/fs2/io/internal/syssocket.scala +++ b/io/native/src/main/scala/fs2/io/internal/syssocket.scala @@ -21,14 +21,11 @@ package fs2.io.internal -import scala.scalanative.posix.inttypes._ import scala.scalanative.posix.sys.socket._ import scala.scalanative.unsafe._ @extern private[io] object syssocket { - type bsd_len_family = CStruct2[uint8_t, uint8_t] - // only in Linux and FreeBSD, but not macOS final val SOCK_NONBLOCK = 2048 From 26a4e4f73fae9f4eaf86ebe24f66201fe4f6543d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 23 Dec 2022 00:05:03 +0000 Subject: [PATCH 35/36] Explicitly track if ipv4/ipv6 socket --- .../scala/fs2/io/internal/SocketHelpers.scala | 19 +++++++++---------- .../fs2/io/net/FdPollingSocketGroup.scala | 13 +++++++++---- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala index 8ebeda78bf..f8e8e82ff7 100644 --- a/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala +++ b/io/native/src/main/scala/fs2/io/internal/SocketHelpers.scala @@ -39,7 +39,6 @@ import scala.scalanative.posix.netinet.in.IPPROTO_TCP import scala.scalanative.posix.netinet.tcp._ import scala.scalanative.posix.string._ import scala.scalanative.posix.sys.socket._ -import scala.scalanative.posix.sys.socketOps._ import scala.scalanative.posix.unistd._ import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ @@ -159,9 +158,11 @@ private[io] object SocketHelpers { throw new IOException(fromCString(strerror(!optval))) } - def getLocalAddress[F[_]](fd: Int)(implicit F: Sync[F]): F[SocketAddress[IpAddress]] = + def getLocalAddress[F[_]](fd: Int, ipv4: Boolean)(implicit + F: Sync[F] + ): F[SocketAddress[IpAddress]] = F.delay { - SocketHelpers.toSocketAddress { (addr, len) => + SocketHelpers.toSocketAddress(ipv4) { (addr, len) => guard_(getsockname(fd, addr, len)) } } @@ -262,20 +263,18 @@ private[io] object SocketHelpers { f(addr, len) } - def toSocketAddress[A]( + def toSocketAddress[A](ipv4: Boolean)( f: (Ptr[sockaddr], Ptr[socklen_t]) => Unit ): SocketAddress[IpAddress] = allocateSockaddr { (addr, len) => f(addr, len) - toSocketAddress(addr) + toSocketAddress(addr, ipv4) } - def toSocketAddress(addr: Ptr[sockaddr]): SocketAddress[IpAddress] = - if (addr.sa_family.toInt == AF_INET) + def toSocketAddress(addr: Ptr[sockaddr], ipv4: Boolean): SocketAddress[IpAddress] = + if (ipv4) toIpv4SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in]]) - else if (addr.sa_family.toInt == AF_INET6) - toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]]) else - throw new IOException(s"Unsupported sa_family: ${addr.sa_family}") + toIpv6SocketAddress(addr.asInstanceOf[Ptr[sockaddr_in6]]) private[this] def toIpv4SocketAddress(addr: Ptr[sockaddr_in]): SocketAddress[Ipv4Address] = { val port = Port.fromInt(ntohs(addr.sin_port).toInt).get diff --git a/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala index aa9a947276..fab3d28e65 100644 --- a/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala +++ b/io/native/src/main/scala/fs2/io/net/FdPollingSocketGroup.scala @@ -69,7 +69,12 @@ private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F] } .to } - socket <- FdPollingSocket[F](fd, handle, SocketHelpers.getLocalAddress(fd), F.pure(address)) + socket <- FdPollingSocket[F]( + fd, + handle, + SocketHelpers.getLocalAddress(fd, ipv4), + F.pure(address) + ) } yield socket def server( @@ -114,7 +119,7 @@ private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F] guard(accept(fd, addr, len)) if (clientFd >= 0) { - val address = SocketHelpers.toSocketAddress(addr) + val address = SocketHelpers.toSocketAddress(addr, ipv4) Right((address, clientFd)) } else Left(()) @@ -133,7 +138,7 @@ private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F] socket <- FdPollingSocket[F]( fd, handle, - SocketHelpers.getLocalAddress(fd), + SocketHelpers.getLocalAddress(fd, ipv4), F.pure(address) ) } yield socket @@ -143,7 +148,7 @@ private final class FdPollingSocketGroup[F[_]: Dns: LiftIO](implicit F: Async[F] .repeat .unNone - serverAddress <- Resource.eval(SocketHelpers.getLocalAddress(fd)) + serverAddress <- Resource.eval(SocketHelpers.getLocalAddress(fd, ipv4)) } yield (serverAddress, sockets) } From b0f71fe606f8c377145fda23cb98948f72c88c7c Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 23 Dec 2022 01:26:29 +0000 Subject: [PATCH 36/36] Fix Scala 3 compile --- io/native/src/main/scala/fs2/io/ioplatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/ioplatform.scala b/io/native/src/main/scala/fs2/io/ioplatform.scala index 66e96834b8..66b7248be6 100644 --- a/io/native/src/main/scala/fs2/io/ioplatform.scala +++ b/io/native/src/main/scala/fs2/io/ioplatform.scala @@ -141,7 +141,7 @@ private[fs2] trait ioplatform extends iojvmnative { def stdoutLines[F[_]: Async: LiftIO, O: Show]( charset: Charset = StandardCharsets.UTF_8 ): Pipe[F, O, Nothing] = - _.map(_.show).through(text.encode(charset)).through(stdout) + _.map(_.show).through(text.encode(charset)).through(stdout(implicitly, implicitly)) /** Stream of `String` read asynchronously from standard input decoded in UTF-8. */ def stdinUtf8[F[_]: Async: LiftIO](bufSize: Int): Stream[F, String] =