From 6b914e891f6c30f332b14dc15bcc78a1985942f5 Mon Sep 17 00:00:00 2001 From: Daniel Alm Date: Mon, 8 Apr 2019 23:56:28 +0200 Subject: [PATCH] Backport `whenAll[Succeed]` to NIO 1 (#947) * Cherry-pick "Significantly improve the performance of `EventLoopFuture.{and,when}AllSucceed`. (#943)" * NIO 1 fixes. --- Sources/NIO/EventLoopFuture.swift | 129 +++++++++++++++--- Sources/NIOPerformanceTester/main.swift | 43 ++++++ .../NIOTests/EventLoopFutureTest+XCTest.swift | 3 + Tests/NIOTests/EventLoopFutureTest.swift | 84 ++++++++++++ 4 files changed, 240 insertions(+), 19 deletions(-) diff --git a/Sources/NIO/EventLoopFuture.swift b/Sources/NIO/EventLoopFuture.swift index 69c9e3e690..9c3fa0a550 100644 --- a/Sources/NIO/EventLoopFuture.swift +++ b/Sources/NIO/EventLoopFuture.swift @@ -862,24 +862,6 @@ extension EventLoopFuture { } return body } -} - -extension EventLoopFuture { - /// Returns a new `EventLoopFuture` that fires only when all the provided futures complete. - /// - /// This extension is only available when you have a collection of `EventLoopFuture`s that do not provide - /// result data: that is, they are completion notifiers. In this case, you can wait for all of them. The - /// returned `EventLoopFuture` will fail as soon as any of the futures fails: otherwise, it will succeed - /// only when all of them do. - /// - /// - parameters: - /// - futures: An array of `EventLoopFuture` to wait for. - /// - eventLoop: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. - /// - returns: A new `EventLoopFuture`. - public static func andAll(_ futures: [EventLoopFuture], eventLoop: EventLoop) -> EventLoopFuture { - let body = EventLoopFuture.reduce((), futures, eventLoop: eventLoop) { (_: (), _: ()) in } - return body - } /// Returns a new `EventLoopFuture` that fires only when all the provided futures complete. /// The new `EventLoopFuture` contains the result of reducing the `initialResult` with the @@ -950,7 +932,116 @@ extension EventLoopFuture { } } -public extension EventLoopFuture { +// "fail fast" reduce +extension EventLoopFuture { + /// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed. + /// + /// This method acts as a successful completion notifier - values fulfilled by each future are discarded. + /// + /// The returned `EventLoopFuture` fails as soon as any of the provided futures fail. + /// + /// If it is desired to always succeed, regardless of failures, use `andAllComplete` instead. + /// - Parameters: + /// - futures: An array of homogenous `EventLoopFutures`s to wait for. + /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on. + /// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed. + public static func andAll(_ futures: [EventLoopFuture], eventLoop: EventLoop) -> EventLoopFuture { + let promise = eventLoop.newPromise(of: Void.self) + + if eventLoop.inEventLoop { + self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) + } else { + eventLoop.execute { + self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) + } + } + + return promise.futureResult + } + + /// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed. + /// The new `EventLoopFuture` will contain all of the values fulfilled by the futures. + /// + /// The returned `EventLoopFuture` will fail as soon as any of the futures fails. + /// - Parameters: + /// - futures: An array of homogenous `EventLoopFuture`s to wait on for fulfilled values. + /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. + /// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures. + public static func whenAll(_ futures: [EventLoopFuture], eventLoop: EventLoop) -> EventLoopFuture<[T]> { + let promise = eventLoop.newPromise(of: Void.self) + + var results: [T?] = .init(repeating: nil, count: futures.count) + let callback = { (index: Int, result: T) in + results[index] = result + } + + if eventLoop.inEventLoop { + self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback) + } else { + eventLoop.execute { + self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback) + } + } + + return promise.futureResult.map { + // verify that all operations have been completed + assert(!results.contains(where: { $0 == nil })) + return results.map { $0! } + } + } + + /// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when + /// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have succeed, the provided promise will succeed. + /// Once any future fails, the provided promise will fail. + private static func _reduceSuccesses0(_ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop, + onValue: @escaping (Int, InputValue) -> Void) { + eventLoop.assertInEventLoop() + + var remainingCount = futures.count + + if remainingCount == 0 { + promise.succeed(result: ()) + return + } + + // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. + func processResult(_ index: Int, _ result: EventLoopFutureValue) { + switch result { + case .success(let result): + onValue(index, result) + remainingCount -= 1 + + if remainingCount == 0 { + promise.succeed(result: ()) + } + case .failure(let error): + promise.fail(error: error) + } + } + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop.inEventLoop, + let result = future.value { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. + processResult(index, result) + if case .failure = result { + return // Once the promise is failed, future results do not need to be processed. + } + } else { + future.hopTo(eventLoop: eventLoop) + ._whenCompleteWithValue { result in processResult(index, result) } + } + } + } +} + +extension EventLoopFuture { /// Returns an `EventLoopFuture` that fires when this future completes, but executes its callbacks on the /// target event loop instead of the original one. /// diff --git a/Sources/NIOPerformanceTester/main.swift b/Sources/NIOPerformanceTester/main.swift index 0ed8321106..9fef60e62d 100644 --- a/Sources/NIOPerformanceTester/main.swift +++ b/Sources/NIOPerformanceTester/main.swift @@ -614,3 +614,46 @@ measureAndPrint(desc: "http1_10k_reqs_100_conns") { } return reqs.reduce(0, +) / reqsPerConn } + +measureAndPrint(desc: "future_whenall_100k_immediately_succeeded_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let futures = expected.map { loop.newSucceededFuture(result: $0) } + let allSucceeded = try! EventLoopFuture.whenAll(futures, eventLoop: loop).wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenall_100k_immediately_succeeded_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let allSucceeded = try! loop.newSucceededFuture(result: ()).then { _ -> EventLoopFuture<[Int]> in + let futures = expected.map { loop.newSucceededFuture(result: $0) } + return EventLoopFuture.whenAll(futures, eventLoop: loop) + }.wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenall_100k_deferred_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.newPromise(of: Int.self) } + let allSucceeded = EventLoopFuture.whenAll(promises.map { $0.futureResult }, eventLoop: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(result: index) + } + return try! allSucceeded.wait().count +} + +measureAndPrint(desc: "future_whenall_100k_deferred_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.newPromise(of: Int.self) } + let allSucceeded = try! loop.newSucceededFuture(result: ()).then { _ -> EventLoopFuture<[Int]> in + let result = EventLoopFuture.whenAll(promises.map { $0.futureResult }, eventLoop: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(result: index) + } + return result + }.wait() + return allSucceeded.count +} diff --git a/Tests/NIOTests/EventLoopFutureTest+XCTest.swift b/Tests/NIOTests/EventLoopFutureTest+XCTest.swift index b2de1d9685..3af5d3f974 100644 --- a/Tests/NIOTests/EventLoopFutureTest+XCTest.swift +++ b/Tests/NIOTests/EventLoopFutureTest+XCTest.swift @@ -62,6 +62,9 @@ extension EventLoopFutureTest { ("testLoopHoppingHelperSuccess", testLoopHoppingHelperSuccess), ("testLoopHoppingHelperFailure", testLoopHoppingHelperFailure), ("testLoopHoppingHelperNoHopping", testLoopHoppingHelperNoHopping), + ("testWhenAllFailsImmediately", testWhenAllFailsImmediately), + ("testWhenAllResolvesAfterFutures", testWhenAllResolvesAfterFutures), + ("testWhenAllIsIndependentOfFulfillmentOrder", testWhenAllIsIndependentOfFulfillmentOrder), ] } } diff --git a/Tests/NIOTests/EventLoopFutureTest.swift b/Tests/NIOTests/EventLoopFutureTest.swift index f901483eda..d288319552 100644 --- a/Tests/NIOTests/EventLoopFutureTest.swift +++ b/Tests/NIOTests/EventLoopFutureTest.swift @@ -828,4 +828,88 @@ class EventLoopFutureTest : XCTestCase { XCTAssertTrue(noHoppingFuture === noHoppingPromise.futureResult) noHoppingPromise.succeed(result: ()) } + + func testWhenAllFailsImmediately() { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let promises = [group.next().newPromise(of: Int.self), + group.next().newPromise(of: Int.self)] + let future = EventLoopFuture.whenAll(promises.map { $0.futureResult }, eventLoop: group.next()) + promises[0].fail(error: EventLoopFutureTestError.example) + XCTAssertThrowsError(try future.wait()) { error in + XCTAssert(type(of: error) == EventLoopFutureTestError.self) + } + } + + func testWhenAllResolvesAfterFutures() throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 6) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let promises = (0..<5).map { _ in group.next().newPromise(of: Int.self) } + let futures = promises.map { $0.futureResult } + + var succeeded = false + var completedPromises = false + + let mainFuture = EventLoopFuture.whenAll(futures, eventLoop: group.next()) + mainFuture.whenSuccess { _ in + XCTAssertTrue(completedPromises) + XCTAssertFalse(succeeded) + succeeded = true + } + + // Should be false, as none of the promises have completed yet + XCTAssertFalse(succeeded) + + // complete the first four promises + for (index, promise) in promises.dropLast().enumerated() { + promise.succeed(result: index) + } + + // Should still be false, as one promise hasn't completed yet + XCTAssertFalse(succeeded) + + // Complete the last promise + completedPromises = true + promises.last!.succeed(result: 4) + + let results = try assertNoThrowWithValue(mainFuture.wait()) + XCTAssertEqual(results, [0, 1, 2, 3, 4]) + } + + func testWhenAllIsIndependentOfFulfillmentOrder() throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 6) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let expected = Array(0..<1000) + let promises = expected.map { _ in group.next().newPromise(of: Int.self) } + let futures = promises.map { $0.futureResult } + + var succeeded = false + var completedPromises = false + + let mainFuture = EventLoopFuture.whenAll(futures, eventLoop: group.next()) + mainFuture.whenSuccess { _ in + XCTAssertTrue(completedPromises) + XCTAssertFalse(succeeded) + succeeded = true + } + + for index in expected.reversed() { + if index == 0 { + completedPromises = true + } + promises[index].succeed(result: index) + } + + let results = try assertNoThrowWithValue(mainFuture.wait()) + XCTAssertEqual(results, expected) + } }