Skip to content

Commit

Permalink
Selector: make sure timerfd is level triggered (#881)
Browse files Browse the repository at this point in the history
Motivation:

In the Selector (because of the deregistrationsHappened workaround) we
expect all events to be level triggered. If there are deregistrations,
we just expect epoll/kqueue to deliver the event again. But timerfd was
configured as `EPOLLET`. That means is a timer fires after some
deregistrations, we lose it.

Modifications:

make timerfd level triggered

Result:

- no more timer stalls
- fixes #872

(cherry picked from commit c8ac593)
  • Loading branch information
weissi authored and Lukasa committed Mar 8, 2019
1 parent 3bd0fc5 commit 29a9f2a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
4 changes: 2 additions & 2 deletions Sources/NIO/Selector.swift
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ final class Selector<R: Registration> {
try Epoll.epoll_ctl(epfd: self.fd, op: Epoll.EPOLL_CTL_ADD, fd: eventfd, event: &ev)

var timerev = Epoll.epoll_event()
timerev.events = Epoll.EPOLLIN | Epoll.EPOLLERR | Epoll.EPOLLRDHUP | Epoll.EPOLLET
timerev.events = Epoll.EPOLLIN | Epoll.EPOLLERR | Epoll.EPOLLRDHUP
timerev.data.fd = timerfd
try Epoll.epoll_ctl(epfd: self.fd, op: Epoll.EPOLL_CTL_ADD, fd: timerfd, event: &timerev)
#else
Expand Down Expand Up @@ -488,7 +488,7 @@ final class Selector<R: Registration> {
case .now:
ready = Int(try Epoll.epoll_wait(epfd: self.fd, events: events, maxevents: Int32(eventsCapacity), timeout: 0))
case .blockUntilTimeout(let timeAmount):
// Only call timerfd_settime if we not already scheduled one that will cover it.
// Only call timerfd_settime if we're not already scheduled one that will cover it.
// This guards against calling timerfd_settime if not needed as this is generally speaking
// expensive.
let next = DispatchTime.now().uptimeNanoseconds + UInt64(timeAmount.nanoseconds)
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/SelectorTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extension SelectorTest {
("testDeregisterWhileProcessingEvents", testDeregisterWhileProcessingEvents),
("testDeregisterAndCloseWhileProcessingEvents", testDeregisterAndCloseWhileProcessingEvents),
("testWeDoNotDeliverEventsForPreviouslyClosedChannels", testWeDoNotDeliverEventsForPreviouslyClosedChannels),
("testTimerFDIsLevelTriggered", testTimerFDIsLevelTriggered),
]
}
}
Expand Down
67 changes: 67 additions & 0 deletions Tests/NIOTests/SelectorTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,71 @@ class SelectorTest: XCTestCase {
XCTAssertNoThrow(try everythingWasReadPromise.futureResult.wait())
XCTAssertNoThrow(try FileManager.default.removeItem(at: URL(fileURLWithPath: tempDir)))
}

func testTimerFDIsLevelTriggered() throws {
// this is a regression test for https://github.com/apple/swift-nio/issues/872
let delayToUseInMicroSeconds: Int64 = 100_000 // needs to be much greater than time it takes to EL.execute

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
class FakeSocket: Socket {
private let hasBeenClosedPromise: EventLoopPromise<Void>
init(hasBeenClosedPromise: EventLoopPromise<Void>, descriptor: CInt) {
self.hasBeenClosedPromise = hasBeenClosedPromise
super.init(descriptor: descriptor)
}
override func close() throws {
self.hasBeenClosedPromise.succeed(result: ())
try super.close()
}
}
var socketFDs: [CInt] = [-1, -1]
#if os(macOS)
let err = socketpair(PF_LOCAL, SOCK_STREAM, 0, &socketFDs)
#else
let err = socketpair(PF_LOCAL, CInt(SOCK_STREAM.rawValue), 0, &socketFDs)
#endif
XCTAssertEqual(0, err)

let numberFires = Atomic<Int>(value: 0)
let el = group.next() as! SelectableEventLoop
let channelHasBeenClosedPromise = el.newPromise(of: Void.self)
let channel = try SocketChannel(socket: FakeSocket(hasBeenClosedPromise: channelHasBeenClosedPromise,
descriptor: socketFDs[0]), eventLoop: el)
let sched = el.scheduleRepeatedTask(initialDelay: .microseconds(TimeAmount.Value(delayToUseInMicroSeconds)),
delay: .microseconds(TimeAmount.Value(delayToUseInMicroSeconds))) { (_: RepeatedTask) in
_ = numberFires.add(1)
}
XCTAssertNoThrow(try el.submit {
// EL tick 1: this is used to
// - actually arm the timer (timerfd_settime)
// - set the channel restration up
if numberFires.load() > 0 {
print("WARNING: This test hit a race and this result doesn't mean it actually worked." +
" This should really only ever happen in very bizarre conditions.")
}
channel.interestedEvent = [.readEOF, .reset]
func workaroundSR9815() {
channel.registerAlreadyConfigured0(promise: nil)
}
workaroundSR9815()
}.wait())
usleep(10_000) // this makes this repro very stable
el.execute {
// EL tick 2: this is used to
// - close one end of the socketpair so that in EL tick 3, we'll see a EPOLLHUP
// - sleep `delayToUseInMicroSeconds + 10` so in EL tick 3, we'll also see timerfd fire
close(socketFDs[1])
usleep(.init(delayToUseInMicroSeconds))
}

// EL tick 3: happens in the background here. We will likely lose the timer signal because of the
// `deregistrationsHappened` workaround in `Selector.swift` and we expect to pick it up again when we enter
// `epoll_wait`/`kevent` next. This however only works if the timer event is level triggered.
assert(numberFires.load() > 5, within: .seconds(1), "timer only fired \(numberFires.load()) times")
sched.cancel()
XCTAssertNoThrow(try channelHasBeenClosedPromise.futureResult.wait())
}
}

0 comments on commit 29a9f2a

Please sign in to comment.