Skip to content

Commit

Permalink
Significantly improve the performance of `EventLoopFuture.{and,when}A…
Browse files Browse the repository at this point in the history
…llSucceed`.
  • Loading branch information
MrMage committed Apr 3, 2019
1 parent df4e078 commit 486d648
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 2 deletions.
90 changes: 88 additions & 2 deletions Sources/NIO/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,17 @@ extension EventLoopFuture {
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on.
/// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed.
public static func andAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
return .reduce((), futures, on: eventLoop) { (_: (), _: Value) in }
let promise = eventLoop.makePromise(of: Void.self)

if eventLoop.inEventLoop {
self._reduceSuccesses0(promise, futures, eventLoop, onResult: { _, _ in })
} else {
eventLoop.execute {
self._reduceSuccesses0(promise, futures, eventLoop, onResult: { _, _ in })
}
}

return promise.futureResult
}

/// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed.
Expand All @@ -1044,7 +1054,83 @@ extension EventLoopFuture {
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire.
/// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures.
public static func whenAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> {
return .reduce(into: [], futures, on: eventLoop) { (results, value) in results.append(value) }
let promise = eventLoop.makePromise(of: Void.self)

var results: [Value?] = .init(repeating: nil, count: futures.count)
let callback = { (index: Int, result: Value) in
results[index] = result
}

if eventLoop.inEventLoop {
self._reduceSuccesses0(promise, futures, eventLoop, onResult: callback)
} else {
eventLoop.execute {
self._reduceSuccesses0(promise, futures, eventLoop, onResult: callback)
}
}

return promise.futureResult.map {
// verify that all operations have been completed
assert(!results.contains(where: { $0 == nil }))
return results.map { $0! }
}
}

/// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when
/// they succeed. The `onResult` will receive the index of the future that fulfilled the provided `Result`.
///
/// Once all the futures have succeed, the provided promise will succeed.
/// Once any future fails, the provided promise will fail.
private static func _reduceSuccesses0<InputValue>(_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onResult: @escaping (Int, InputValue) -> Void) {
eventLoop.assertInEventLoop()

var remainingCount = futures.count

if remainingCount == 0 {
promise.succeed(())
return
}

// loop through the futures to chain callbacks to execute on the initiating event loop and grab their index
// in the "futures" to pass their result to the caller
for (index, future) in futures.enumerated() {
if future.eventLoop === eventLoop,
let result = future._value {
// Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a
// ~20% performance improvement in the case of large arrays where all elements are already fulfilled.
switch result {
case .success(let result):
onResult(index, result)
remainingCount -= 1

if remainingCount == 0 {
promise.succeed(())
}

case .failure(let error):
promise.fail(error)
return // Once the promise is failed, future results do not need to be processed.
}
} else {
future.hop(to: eventLoop)
.whenComplete { result in
switch result {
case .success(let result):
onResult(index, result)
remainingCount -= 1

if remainingCount == 0 {
promise.succeed(())
}

case .failure(let error): promise.fail(error)
}
}
}
}
}
}

Expand Down
87 changes: 87 additions & 0 deletions Sources/NIOPerformanceTester/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,90 @@ measureAndPrint(desc: "http1_10k_reqs_100_conns") {
}
return reqs.reduce(0, +) / reqsPerConn
}

measureAndPrint(desc: "future_whenallsucceed_100k_immediately_succeeded_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let futures = expected.map { loop.makeSucceededFuture($0) }
let allSucceeded = try! EventLoopFuture.whenAllSucceed(futures, on: loop).wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenallsucceed_100k_immediately_succeeded_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Int]> in
let futures = expected.map { loop.makeSucceededFuture($0) }
return EventLoopFuture.whenAllSucceed(futures, on: loop)
}.wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenallsucceed_100k_deferred_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.makePromise(of: Int.self) }
let allSucceeded = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(index)
}
return try! allSucceeded.wait().count
}

measureAndPrint(desc: "future_whenallsucceed_100k_deferred_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.makePromise(of: Int.self) }
let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Int]> in
let result = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(index)
}
return result
}.wait()
return allSucceeded.count
}


measureAndPrint(desc: "future_whenallcomplete_100k_immediately_succeeded_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let futures = expected.map { loop.makeSucceededFuture($0) }
let allSucceeded = try! EventLoopFuture.whenAllComplete(futures, on: loop).wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenallcomplete_100k_immediately_succeeded_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Result<Int, Error>]> in
let futures = expected.map { loop.makeSucceededFuture($0) }
return EventLoopFuture.whenAllComplete(futures, on: loop)
}.wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenallcomplete_100k_deferred_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.makePromise(of: Int.self) }
let allSucceeded = EventLoopFuture.whenAllComplete(promises.map { $0.futureResult }, on: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(index)
}
return try! allSucceeded.wait().count
}

measureAndPrint(desc: "future_whenallcomplete_100k_deferred_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.makePromise(of: Int.self) }
let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Result<Int, Error>]> in
let result = EventLoopFuture.whenAllComplete(promises.map { $0.futureResult }, on: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(index)
}
return result
}.wait()
return allSucceeded.count
}
84 changes: 84 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,90 @@ class EventLoopFutureTest : XCTestCase {
}
}

func testWhenAllSucceedFailsImmediately() {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let promises = [group.next().makePromise(of: Int.self),
group.next().makePromise(of: Int.self)]
let future = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: group.next())
promises[0].fail(EventLoopFutureTestError.example)
XCTAssertThrowsError(try future.wait()) { error in
XCTAssert(type(of: error) == EventLoopFutureTestError.self)
}
}

func testWhenAllSucceedResolvesAfterFutures() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 6)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let promises = (0..<5).map { _ in group.next().makePromise(of: Int.self) }
let futures = promises.map { $0.futureResult }

var succeeded = false
var completedPromises = false

let mainFuture = EventLoopFuture.whenAllSucceed(futures, on: group.next())
mainFuture.whenSuccess { _ in
XCTAssertTrue(completedPromises)
XCTAssertFalse(succeeded)
succeeded = true
}

// Should be false, as none of the promises have completed yet
XCTAssertFalse(succeeded)

// complete the first four promises
for (index, promise) in promises.dropLast().enumerated() {
promise.succeed(index)
}

// Should still be false, as one promise hasn't completed yet
XCTAssertFalse(succeeded)

// Complete the last promise
completedPromises = true
promises.last!.succeed(4)

let results = try assertNoThrowWithValue(mainFuture.wait())
XCTAssertEqual(results, [0, 1, 2, 3, 4])
}

func testWhenAllSucceedIsIndependentOfFulfillmentOrder() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 6)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let expected = Array(0..<1000)
let promises = expected.map { _ in group.next().makePromise(of: Int.self) }
let futures = promises.map { $0.futureResult }

var succeeded = false
var completedPromises = false

let mainFuture = EventLoopFuture.whenAllSucceed(futures, on: group.next())
mainFuture.whenSuccess { _ in
XCTAssertTrue(completedPromises)
XCTAssertFalse(succeeded)
succeeded = true
}

for index in expected.reversed() {
if index == 0 {
completedPromises = true
}
promises[index].succeed(index)
}

let results = try assertNoThrowWithValue(mainFuture.wait())
XCTAssertEqual(results, expected)
}

func testWhenAllCompleteResultsWithFailuresStillSucceed() {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer {
Expand Down

0 comments on commit 486d648

Please sign in to comment.