Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native Polling System #3314

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6d23310
Extract `PollingSystem` abstraction on Native
armanbilge Dec 11, 2022
727bfe8
Add `FileDescriptorPoller` abstraction
armanbilge Dec 11, 2022
9a8f7ed
Add `EpollSystem` and `KqueueSystem`
armanbilge Dec 11, 2022
0889426
Add test for `Scheduler#sleep`
armanbilge Dec 11, 2022
4250049
Add `FileDescriptorPollerSpec`
armanbilge Dec 11, 2022
0b9ac02
Consistent error-handling in `KqueueSystem`
armanbilge Dec 11, 2022
956734a
Make `pollingSystem` configurable in `IOApp`
armanbilge Dec 11, 2022
dda54b7
Nowarn unuseds
armanbilge Dec 12, 2022
1206b27
Revise the fd poller spec
armanbilge Dec 12, 2022
e0c4ec3
Remove `maxEvents` config from `EpollSystem`
armanbilge Dec 12, 2022
eeeb3e6
Remove `maxEvents` config from `KqueueSystem`
armanbilge Dec 12, 2022
c4a0a16
Add test for many simultaneous events
armanbilge Dec 12, 2022
aae6e97
Remove redundant `final`
armanbilge Dec 12, 2022
457f89c
Update comment
armanbilge Dec 12, 2022
721c2fc
Add test for pre-existing readiness
armanbilge Dec 12, 2022
4f9e57b
Add test for no readiness
armanbilge Dec 12, 2022
a520aee
Reimagine `FileDescriptorPoller`
armanbilge Dec 19, 2022
5f8146b
Fix parameter names
armanbilge Dec 19, 2022
3640605
Refactor/redesign `PollingSystem` ... again ... (:
armanbilge Dec 19, 2022
42491da
Dump `EventLoop` abstraction
armanbilge Dec 19, 2022
786127c
Update the `FileDescriptorPollerSpec`
armanbilge Dec 19, 2022
de3eea0
Rework `EpollSystem`
armanbilge Dec 20, 2022
eb8ba84
Set pipes to non-blocking mode
armanbilge Dec 20, 2022
0124567
Add fcntl import
armanbilge Dec 20, 2022
72b05a7
Fix bugs in spec
armanbilge Dec 20, 2022
d18fa76
Add some uncancelables
armanbilge Dec 20, 2022
4d3a916
Revert "Add some uncancelables"
armanbilge Dec 20, 2022
9ba870f
Rework `KqueueSystem`
armanbilge Dec 20, 2022
9673883
Post-refactor typos
armanbilge Dec 20, 2022
43b0b0a
Scope `.evalOn` even more tightly
armanbilge Dec 24, 2022
e5dd04f
Use `asyncCheckAttempt`
armanbilge Dec 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,14 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
} else Seq()
}
)
.nativeSettings(
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.unsafe.PollingExecutorScheduler$SleepTask"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$")
)
)

/**
* Test support for the core project, providing various helpful instances like ScalaCheck
Expand Down
24 changes: 18 additions & 6 deletions core/native/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import cats.effect.metrics.NativeCpuStarvationMetrics

import scala.concurrent.CancellationException
import scala.concurrent.duration._
import scala.scalanative.meta.LinktimeInfo

/**
* The primary entry point to a Cats Effect application. Extend this trait rather than defining
Expand Down Expand Up @@ -165,6 +166,21 @@ trait IOApp {
*/
protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig()

/**
* The [[unsafe.PollingSystem]] used by the [[runtime]] which will evaluate the [[IO]]
* produced by `run`. It is very unlikely that users will need to override this method.
*
* [[unsafe.PollingSystem]] implementors may provide their own flavors of [[IOApp]] that
* override this method.
*/
protected def pollingSystem: unsafe.PollingSystem =
if (LinktimeInfo.isLinux)
unsafe.EpollSystem(64)
else if (LinktimeInfo.isMac)
unsafe.KqueueSystem(64)
else
unsafe.SleepSystem
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should… probably support Windows too someday :-(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

More seriously, I think an fs2-uv with a libuv-based polling system would be the least masochistic way to deal with this.


/**
* The entry point for your application. Will be called by the runtime when the process is
* started. If the underlying runtime supports it, any arguments passed to the process will be
Expand All @@ -186,12 +202,8 @@ trait IOApp {
import unsafe.IORuntime

val installed = IORuntime installGlobal {
IORuntime(
IORuntime.defaultComputeExecutionContext,
IORuntime.defaultComputeExecutionContext,
IORuntime.defaultScheduler,
() => (),
runtimeConfig)
val loop = IORuntime.createEventLoop(pollingSystem)
IORuntime(loop, loop, loop, () => (), runtimeConfig)
}

_runtime = IORuntime.global
Expand Down
10 changes: 10 additions & 0 deletions core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package cats.effect

import cats.effect.std.Console
import cats.effect.unsafe.EventLoop

import scala.reflect.ClassTag

import java.time.Instant

Expand Down Expand Up @@ -62,4 +65,11 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type =>
*/
def readLine: IO[String] =
Console[IO].readLine

def eventLoop[Poller](implicit ct: ClassTag[Poller]): IO[Option[EventLoop[Poller]]] =
IO.executionContext.map {
case loop: EventLoop[_] if ct.runtimeClass.isInstance(loop.poller()) =>
Some(loop.asInstanceOf[EventLoop[Poller]])
case _ => None
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poller is accessed like this.

}
159 changes: 159 additions & 0 deletions core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package unsafe

import scala.scalanative.libc.errno._
import scala.scalanative.posix.string._
import scala.scalanative.posix.unistd
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._
import scala.util.control.NonFatal

import java.io.IOException
import java.util.{Collections, IdentityHashMap, Set}

import EpollSystem.epoll._
import EpollSystem.epollImplicits._

final class EpollSystem private (maxEvents: Int) extends PollingSystem {

def makePoller(): Poller = {
val fd = epoll_create1(0)
if (fd == -1)
throw new IOException(fromCString(strerror(errno)))
new Poller(fd, maxEvents)
}

def close(poller: Poller): Unit = poller.close()

def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean =
poller.poll(nanos, reportFailure)

final class Poller private[EpollSystem] (private[EpollSystem] val epfd: Int, maxEvents: Int)
extends FileDescriptorPoller {

private[this] val callbacks: Set[FileDescriptorPoller.Callback] =
Collections.newSetFromMap(new IdentityHashMap)

private[EpollSystem] def close(): Unit =
if (unistd.close(epfd) != 0)
throw new IOException(fromCString(strerror(errno)))

private[EpollSystem] def poll(timeout: Long, reportFailure: Throwable => Unit): Boolean = {
val noCallbacks = callbacks.isEmpty()

if (timeout <= 0 && noCallbacks)
false // nothing to do here
else {
val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt

val events = stackalloc[epoll_event](maxEvents.toUInt)

val triggeredEvents = epoll_wait(epfd, events, maxEvents, timeoutMillis)

if (triggeredEvents >= 0) {
var i = 0
while (i < triggeredEvents) {
val event = events + i.toLong
val cb = FileDescriptorPoller.Callback.fromPtr(event.data)
try {
val e = event.events.toInt
val readReady = (e & EPOLLIN) != 0
val writeReady = (e & EPOLLOUT) != 0
cb.notifyFileDescriptorEvents(readReady, writeReady)
} catch {
case ex if NonFatal(ex) => reportFailure(ex)
}
i += 1
}
} else {
throw new IOException(fromCString(strerror(errno)))
}

!callbacks.isEmpty()
}
}

def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)(
cb: FileDescriptorPoller.Callback): Runnable = {
val event = stackalloc[epoll_event]()
event.events =
(EPOLLET | (if (reads) EPOLLIN else 0) | (if (writes) EPOLLOUT else 0)).toUInt
event.data = FileDescriptorPoller.Callback.toPtr(cb)

if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event) != 0)
throw new IOException(fromCString(strerror(errno)))
callbacks.add(cb)

() => {
callbacks.remove(cb)
if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, null) != 0)
throw new IOException(fromCString(strerror(errno)))
}
}
}

}

object EpollSystem {
def apply(maxEvents: Int): EpollSystem = new EpollSystem(maxEvents)

@extern
private[unsafe] object epoll {

final val EPOLL_CTL_ADD = 1
final val EPOLL_CTL_DEL = 2
final val EPOLL_CTL_MOD = 3

final val EPOLLIN = 0x001
final val EPOLLOUT = 0x004
final val EPOLLONESHOT = 1 << 30
final val EPOLLET = 1 << 31

type epoll_event
type epoll_data_t = Ptr[Byte]

def epoll_create1(flags: Int): Int = extern

def epoll_ctl(epfd: Int, op: Int, fd: Int, event: Ptr[epoll_event]): Int = extern

def epoll_wait(epfd: Int, events: Ptr[epoll_event], maxevents: Int, timeout: Int): Int =
extern

}

private[unsafe] object epollImplicits {

implicit final class epoll_eventOps(epoll_event: Ptr[epoll_event]) {
def events: CUnsignedInt = !(epoll_event.asInstanceOf[Ptr[CUnsignedInt]])
def events_=(events: CUnsignedInt): Unit =
!(epoll_event.asInstanceOf[Ptr[CUnsignedInt]]) = events

def data: epoll_data_t =
!((epoll_event.asInstanceOf[Ptr[Byte]] + sizeof[CUnsignedInt])
.asInstanceOf[Ptr[epoll_data_t]])
def data_=(data: epoll_data_t): Unit =
!((epoll_event.asInstanceOf[Ptr[Byte]] + sizeof[CUnsignedInt])
.asInstanceOf[Ptr[epoll_data_t]]) = data
}

implicit val epoll_eventTag: Tag[epoll_event] =
Tag.materializeCArrayTag[Byte, Nat.Digit2[Nat._1, Nat._2]].asInstanceOf[Tag[epoll_event]]

}
}
26 changes: 26 additions & 0 deletions core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package unsafe

import scala.concurrent.ExecutionContext

trait EventLoop[Poller] extends ExecutionContext {

def poller(): Poller

}
Loading