Skip to content

Commit

Permalink
Backport whenAll[Succeed] to NIO 1 (#947)
Browse files Browse the repository at this point in the history
* Cherry-pick "Significantly improve the performance of `EventLoopFuture.{and,when}AllSucceed`. (#943)"

* NIO 1 fixes.
  • Loading branch information
MrMage authored and weissi committed Apr 8, 2019
1 parent f5ca03d commit 6b914e8
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 19 deletions.
129 changes: 110 additions & 19 deletions Sources/NIO/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -862,24 +862,6 @@ extension EventLoopFuture {
}
return body
}
}

extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that fires only when all the provided futures complete.
///
/// This extension is only available when you have a collection of `EventLoopFuture`s that do not provide
/// result data: that is, they are completion notifiers. In this case, you can wait for all of them. The
/// returned `EventLoopFuture` will fail as soon as any of the futures fails: otherwise, it will succeed
/// only when all of them do.
///
/// - parameters:
/// - futures: An array of `EventLoopFuture<Void>` to wait for.
/// - eventLoop: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire.
/// - returns: A new `EventLoopFuture`.
public static func andAll(_ futures: [EventLoopFuture<Void>], eventLoop: EventLoop) -> EventLoopFuture<Void> {
let body = EventLoopFuture<Void>.reduce((), futures, eventLoop: eventLoop) { (_: (), _: ()) in }
return body
}

/// Returns a new `EventLoopFuture` that fires only when all the provided futures complete.
/// The new `EventLoopFuture` contains the result of reducing the `initialResult` with the
Expand Down Expand Up @@ -950,7 +932,116 @@ extension EventLoopFuture {
}
}

public extension EventLoopFuture {
// "fail fast" reduce
extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed.
///
/// This method acts as a successful completion notifier - values fulfilled by each future are discarded.
///
/// The returned `EventLoopFuture` fails as soon as any of the provided futures fail.
///
/// If it is desired to always succeed, regardless of failures, use `andAllComplete` instead.
/// - Parameters:
/// - futures: An array of homogenous `EventLoopFutures`s to wait for.
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on.
/// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed.
public static func andAll(_ futures: [EventLoopFuture<T>], eventLoop: EventLoop) -> EventLoopFuture<Void> {
let promise = eventLoop.newPromise(of: Void.self)

if eventLoop.inEventLoop {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in })
} else {
eventLoop.execute {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in })
}
}

return promise.futureResult
}

/// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed.
/// The new `EventLoopFuture` will contain all of the values fulfilled by the futures.
///
/// The returned `EventLoopFuture` will fail as soon as any of the futures fails.
/// - Parameters:
/// - futures: An array of homogenous `EventLoopFuture`s to wait on for fulfilled values.
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire.
/// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures.
public static func whenAll(_ futures: [EventLoopFuture<T>], eventLoop: EventLoop) -> EventLoopFuture<[T]> {
let promise = eventLoop.newPromise(of: Void.self)

var results: [T?] = .init(repeating: nil, count: futures.count)
let callback = { (index: Int, result: T) in
results[index] = result
}

if eventLoop.inEventLoop {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback)
} else {
eventLoop.execute {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback)
}
}

return promise.futureResult.map {
// verify that all operations have been completed
assert(!results.contains(where: { $0 == nil }))
return results.map { $0! }
}
}

/// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when
/// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`.
///
/// Once all the futures have succeed, the provided promise will succeed.
/// Once any future fails, the provided promise will fail.
private static func _reduceSuccesses0<InputValue>(_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onValue: @escaping (Int, InputValue) -> Void) {
eventLoop.assertInEventLoop()

var remainingCount = futures.count

if remainingCount == 0 {
promise.succeed(result: ())
return
}

// Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate.
func processResult(_ index: Int, _ result: EventLoopFutureValue<InputValue>) {
switch result {
case .success(let result):
onValue(index, result)
remainingCount -= 1

if remainingCount == 0 {
promise.succeed(result: ())
}
case .failure(let error):
promise.fail(error: error)
}
}
// loop through the futures to chain callbacks to execute on the initiating event loop and grab their index
// in the "futures" to pass their result to the caller
for (index, future) in futures.enumerated() {
if future.eventLoop.inEventLoop,
let result = future.value {
// Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a
// ~20% performance improvement in the case of large arrays where all elements are already fulfilled.
processResult(index, result)
if case .failure = result {
return // Once the promise is failed, future results do not need to be processed.
}
} else {
future.hopTo(eventLoop: eventLoop)
._whenCompleteWithValue { result in processResult(index, result) }
}
}
}
}

extension EventLoopFuture {

This comment has been minimized.

Copy link
@ffried

ffried Apr 11, 2019

Contributor

Together with 26bee21 this breaks builds using hopTo(eventLoop:) since it's now no longer public because neither the extension is public now, nor the hopTo(eventLoop:) definition itself.

This comment has been minimized.

Copy link
@ffried

ffried Apr 11, 2019

Contributor

See #964

/// Returns an `EventLoopFuture` that fires when this future completes, but executes its callbacks on the
/// target event loop instead of the original one.
///
Expand Down
43 changes: 43 additions & 0 deletions Sources/NIOPerformanceTester/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,46 @@ measureAndPrint(desc: "http1_10k_reqs_100_conns") {
}
return reqs.reduce(0, +) / reqsPerConn
}

measureAndPrint(desc: "future_whenall_100k_immediately_succeeded_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let futures = expected.map { loop.newSucceededFuture(result: $0) }
let allSucceeded = try! EventLoopFuture.whenAll(futures, eventLoop: loop).wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenall_100k_immediately_succeeded_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let allSucceeded = try! loop.newSucceededFuture(result: ()).then { _ -> EventLoopFuture<[Int]> in
let futures = expected.map { loop.newSucceededFuture(result: $0) }
return EventLoopFuture.whenAll(futures, eventLoop: loop)
}.wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenall_100k_deferred_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.newPromise(of: Int.self) }
let allSucceeded = EventLoopFuture.whenAll(promises.map { $0.futureResult }, eventLoop: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(result: index)
}
return try! allSucceeded.wait().count
}

measureAndPrint(desc: "future_whenall_100k_deferred_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.newPromise(of: Int.self) }
let allSucceeded = try! loop.newSucceededFuture(result: ()).then { _ -> EventLoopFuture<[Int]> in
let result = EventLoopFuture.whenAll(promises.map { $0.futureResult }, eventLoop: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(result: index)
}
return result
}.wait()
return allSucceeded.count
}
3 changes: 3 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ extension EventLoopFutureTest {
("testLoopHoppingHelperSuccess", testLoopHoppingHelperSuccess),
("testLoopHoppingHelperFailure", testLoopHoppingHelperFailure),
("testLoopHoppingHelperNoHopping", testLoopHoppingHelperNoHopping),
("testWhenAllFailsImmediately", testWhenAllFailsImmediately),
("testWhenAllResolvesAfterFutures", testWhenAllResolvesAfterFutures),
("testWhenAllIsIndependentOfFulfillmentOrder", testWhenAllIsIndependentOfFulfillmentOrder),
]
}
}
Expand Down
84 changes: 84 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -828,4 +828,88 @@ class EventLoopFutureTest : XCTestCase {
XCTAssertTrue(noHoppingFuture === noHoppingPromise.futureResult)
noHoppingPromise.succeed(result: ())
}

func testWhenAllFailsImmediately() {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let promises = [group.next().newPromise(of: Int.self),
group.next().newPromise(of: Int.self)]
let future = EventLoopFuture.whenAll(promises.map { $0.futureResult }, eventLoop: group.next())
promises[0].fail(error: EventLoopFutureTestError.example)
XCTAssertThrowsError(try future.wait()) { error in
XCTAssert(type(of: error) == EventLoopFutureTestError.self)
}
}

func testWhenAllResolvesAfterFutures() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 6)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let promises = (0..<5).map { _ in group.next().newPromise(of: Int.self) }
let futures = promises.map { $0.futureResult }

var succeeded = false
var completedPromises = false

let mainFuture = EventLoopFuture.whenAll(futures, eventLoop: group.next())
mainFuture.whenSuccess { _ in
XCTAssertTrue(completedPromises)
XCTAssertFalse(succeeded)
succeeded = true
}

// Should be false, as none of the promises have completed yet
XCTAssertFalse(succeeded)

// complete the first four promises
for (index, promise) in promises.dropLast().enumerated() {
promise.succeed(result: index)
}

// Should still be false, as one promise hasn't completed yet
XCTAssertFalse(succeeded)

// Complete the last promise
completedPromises = true
promises.last!.succeed(result: 4)

let results = try assertNoThrowWithValue(mainFuture.wait())
XCTAssertEqual(results, [0, 1, 2, 3, 4])
}

func testWhenAllIsIndependentOfFulfillmentOrder() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 6)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let expected = Array(0..<1000)
let promises = expected.map { _ in group.next().newPromise(of: Int.self) }
let futures = promises.map { $0.futureResult }

var succeeded = false
var completedPromises = false

let mainFuture = EventLoopFuture.whenAll(futures, eventLoop: group.next())
mainFuture.whenSuccess { _ in
XCTAssertTrue(completedPromises)
XCTAssertFalse(succeeded)
succeeded = true
}

for index in expected.reversed() {
if index == 0 {
completedPromises = true
}
promises[index].succeed(result: index)
}

let results = try assertNoThrowWithValue(mainFuture.wait())
XCTAssertEqual(results, expected)
}
}

0 comments on commit 6b914e8

Please sign in to comment.