From f7f65a44585ae6c72c7e8f1d03047d5343baee1b Mon Sep 17 00:00:00 2001 From: Daniel Alm Date: Wed, 3 Apr 2019 17:03:28 +0200 Subject: [PATCH 1/4] Significantly improve the performance of `EventLoopFuture.{and,when}AllSucceed`. --- Sources/NIO/EventLoopFuture.swift | 90 ++++++++++++++++++- Sources/NIOPerformanceTester/main.swift | 87 ++++++++++++++++++ .../NIOTests/EventLoopFutureTest+XCTest.swift | 3 + Tests/NIOTests/EventLoopFutureTest.swift | 84 +++++++++++++++++ 4 files changed, 262 insertions(+), 2 deletions(-) diff --git a/Sources/NIO/EventLoopFuture.swift b/Sources/NIO/EventLoopFuture.swift index 0377247ce4..9af8254945 100644 --- a/Sources/NIO/EventLoopFuture.swift +++ b/Sources/NIO/EventLoopFuture.swift @@ -1032,7 +1032,17 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on. /// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed. public static func andAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture { - return .reduce((), futures, on: eventLoop) { (_: (), _: Value) in } + let promise = eventLoop.makePromise(of: Void.self) + + if eventLoop.inEventLoop { + self._reduceSuccesses0(promise, futures, eventLoop, onResult: { _, _ in }) + } else { + eventLoop.execute { + self._reduceSuccesses0(promise, futures, eventLoop, onResult: { _, _ in }) + } + } + + return promise.futureResult } /// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed. @@ -1044,7 +1054,83 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures. public static func whenAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> { - return .reduce(into: [], futures, on: eventLoop) { (results, value) in results.append(value) } + let promise = eventLoop.makePromise(of: Void.self) + + var results: [Value?] = .init(repeating: nil, count: futures.count) + let callback = { (index: Int, result: Value) in + results[index] = result + } + + if eventLoop.inEventLoop { + self._reduceSuccesses0(promise, futures, eventLoop, onResult: callback) + } else { + eventLoop.execute { + self._reduceSuccesses0(promise, futures, eventLoop, onResult: callback) + } + } + + return promise.futureResult.map { + // verify that all operations have been completed + assert(!results.contains(where: { $0 == nil })) + return results.map { $0! } + } + } + + /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when + /// they succeed. The `onResult` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have succeed, the provided promise will succeed. + /// Once any future fails, the provided promise will fail. + private static func _reduceSuccesses0(_ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop, + onResult: @escaping (Int, InputValue) -> Void) { + eventLoop.assertInEventLoop() + + var remainingCount = futures.count + + if remainingCount == 0 { + promise.succeed(()) + return + } + + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop === eventLoop, + let result = future._value { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. + switch result { + case .success(let result): + onResult(index, result) + remainingCount -= 1 + + if remainingCount == 0 { + promise.succeed(()) + } + + case .failure(let error): + promise.fail(error) + return // Once the promise is failed, future results do not need to be processed. + } + } else { + future.hop(to: eventLoop) + .whenComplete { result in + switch result { + case .success(let result): + onResult(index, result) + remainingCount -= 1 + + if remainingCount == 0 { + promise.succeed(()) + } + + case .failure(let error): promise.fail(error) + } + } + } + } } } diff --git a/Sources/NIOPerformanceTester/main.swift b/Sources/NIOPerformanceTester/main.swift index e7dd71e496..0443df87cf 100644 --- a/Sources/NIOPerformanceTester/main.swift +++ b/Sources/NIOPerformanceTester/main.swift @@ -614,3 +614,90 @@ measureAndPrint(desc: "http1_10k_reqs_100_conns") { } return reqs.reduce(0, +) / reqsPerConn } + +measureAndPrint(desc: "future_whenallsucceed_100k_immediately_succeeded_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let futures = expected.map { loop.makeSucceededFuture($0) } + let allSucceeded = try! EventLoopFuture.whenAllSucceed(futures, on: loop).wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenallsucceed_100k_immediately_succeeded_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Int]> in + let futures = expected.map { loop.makeSucceededFuture($0) } + return EventLoopFuture.whenAllSucceed(futures, on: loop) + }.wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenallsucceed_100k_deferred_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.makePromise(of: Int.self) } + let allSucceeded = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(index) + } + return try! allSucceeded.wait().count +} + +measureAndPrint(desc: "future_whenallsucceed_100k_deferred_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.makePromise(of: Int.self) } + let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Int]> in + let result = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(index) + } + return result + }.wait() + return allSucceeded.count +} + + +measureAndPrint(desc: "future_whenallcomplete_100k_immediately_succeeded_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let futures = expected.map { loop.makeSucceededFuture($0) } + let allSucceeded = try! EventLoopFuture.whenAllComplete(futures, on: loop).wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenallcomplete_100k_immediately_succeeded_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Result]> in + let futures = expected.map { loop.makeSucceededFuture($0) } + return EventLoopFuture.whenAllComplete(futures, on: loop) + }.wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenallcomplete_100k_deferred_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.makePromise(of: Int.self) } + let allSucceeded = EventLoopFuture.whenAllComplete(promises.map { $0.futureResult }, on: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(index) + } + return try! allSucceeded.wait().count +} + +measureAndPrint(desc: "future_whenallcomplete_100k_deferred_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.makePromise(of: Int.self) } + let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Result]> in + let result = EventLoopFuture.whenAllComplete(promises.map { $0.futureResult }, on: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(index) + } + return result + }.wait() + return allSucceeded.count +} diff --git a/Tests/NIOTests/EventLoopFutureTest+XCTest.swift b/Tests/NIOTests/EventLoopFutureTest+XCTest.swift index 1f163120fa..1db1a5b39d 100644 --- a/Tests/NIOTests/EventLoopFutureTest+XCTest.swift +++ b/Tests/NIOTests/EventLoopFutureTest+XCTest.swift @@ -64,6 +64,9 @@ extension EventLoopFutureTest { ("testLoopHoppingHelperNoHopping", testLoopHoppingHelperNoHopping), ("testFlatMapResultHappyPath", testFlatMapResultHappyPath), ("testFlatMapResultFailurePath", testFlatMapResultFailurePath), + ("testWhenAllSucceedFailsImmediately", testWhenAllSucceedFailsImmediately), + ("testWhenAllSucceedResolvesAfterFutures", testWhenAllSucceedResolvesAfterFutures), + ("testWhenAllSucceedIsIndependentOfFulfillmentOrder", testWhenAllSucceedIsIndependentOfFulfillmentOrder), ("testWhenAllCompleteResultsWithFailuresStillSucceed", testWhenAllCompleteResultsWithFailuresStillSucceed), ("testWhenAllCompleteResults", testWhenAllCompleteResults), ("testWhenAllCompleteResolvesAfterFutures", testWhenAllCompleteResolvesAfterFutures), diff --git a/Tests/NIOTests/EventLoopFutureTest.swift b/Tests/NIOTests/EventLoopFutureTest.swift index b2bea8179e..5a3258b1d6 100644 --- a/Tests/NIOTests/EventLoopFutureTest.swift +++ b/Tests/NIOTests/EventLoopFutureTest.swift @@ -860,6 +860,90 @@ class EventLoopFutureTest : XCTestCase { } } + func testWhenAllSucceedFailsImmediately() { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let promises = [group.next().makePromise(of: Int.self), + group.next().makePromise(of: Int.self)] + let future = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: group.next()) + promises[0].fail(EventLoopFutureTestError.example) + XCTAssertThrowsError(try future.wait()) { error in + XCTAssert(type(of: error) == EventLoopFutureTestError.self) + } + } + + func testWhenAllSucceedResolvesAfterFutures() throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 6) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let promises = (0..<5).map { _ in group.next().makePromise(of: Int.self) } + let futures = promises.map { $0.futureResult } + + var succeeded = false + var completedPromises = false + + let mainFuture = EventLoopFuture.whenAllSucceed(futures, on: group.next()) + mainFuture.whenSuccess { _ in + XCTAssertTrue(completedPromises) + XCTAssertFalse(succeeded) + succeeded = true + } + + // Should be false, as none of the promises have completed yet + XCTAssertFalse(succeeded) + + // complete the first four promises + for (index, promise) in promises.dropLast().enumerated() { + promise.succeed(index) + } + + // Should still be false, as one promise hasn't completed yet + XCTAssertFalse(succeeded) + + // Complete the last promise + completedPromises = true + promises.last!.succeed(4) + + let results = try assertNoThrowWithValue(mainFuture.wait()) + XCTAssertEqual(results, [0, 1, 2, 3, 4]) + } + + func testWhenAllSucceedIsIndependentOfFulfillmentOrder() throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 6) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let expected = Array(0..<1000) + let promises = expected.map { _ in group.next().makePromise(of: Int.self) } + let futures = promises.map { $0.futureResult } + + var succeeded = false + var completedPromises = false + + let mainFuture = EventLoopFuture.whenAllSucceed(futures, on: group.next()) + mainFuture.whenSuccess { _ in + XCTAssertTrue(completedPromises) + XCTAssertFalse(succeeded) + succeeded = true + } + + for index in expected.reversed() { + if index == 0 { + completedPromises = true + } + promises[index].succeed(index) + } + + let results = try assertNoThrowWithValue(mainFuture.wait()) + XCTAssertEqual(results, expected) + } + func testWhenAllCompleteResultsWithFailuresStillSucceed() { let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) defer { From 52e24f0b85ddeda365b5bbbc5feba71fa0288562 Mon Sep 17 00:00:00 2001 From: Daniel Alm Date: Wed, 3 Apr 2019 18:17:17 +0200 Subject: [PATCH 2/4] PR improvements. --- Sources/NIO/EventLoopFuture.swift | 53 ++++++++++++++----------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/Sources/NIO/EventLoopFuture.swift b/Sources/NIO/EventLoopFuture.swift index 9af8254945..1724181858 100644 --- a/Sources/NIO/EventLoopFuture.swift +++ b/Sources/NIO/EventLoopFuture.swift @@ -1035,10 +1035,10 @@ extension EventLoopFuture { let promise = eventLoop.makePromise(of: Void.self) if eventLoop.inEventLoop { - self._reduceSuccesses0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) } else { eventLoop.execute { - self._reduceSuccesses0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) } } @@ -1062,10 +1062,10 @@ extension EventLoopFuture { } if eventLoop.inEventLoop { - self._reduceSuccesses0(promise, futures, eventLoop, onResult: callback) + self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback) } else { eventLoop.execute { - self._reduceSuccesses0(promise, futures, eventLoop, onResult: callback) + self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback) } } @@ -1076,15 +1076,15 @@ extension EventLoopFuture { } } - /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when - /// they succeed. The `onResult` will receive the index of the future that fulfilled the provided `Result`. + /// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when + /// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`. /// /// Once all the futures have succeed, the provided promise will succeed. /// Once any future fails, the provided promise will fail. private static func _reduceSuccesses0(_ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, - onResult: @escaping (Int, InputValue) -> Void) { + onValue: @escaping (Int, InputValue) -> Void) { eventLoop.assertInEventLoop() var remainingCount = futures.count @@ -1094,6 +1094,20 @@ extension EventLoopFuture { return } + // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. + func processResult(_ index: Int, _ result: Result) { + switch result { + case .success(let result): + onValue(index, result) + remainingCount -= 1 + + if remainingCount == 0 { + promise.succeed(()) + } + case .failure(let error): + promise.fail(error) + } + } // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index // in the "futures" to pass their result to the caller for (index, future) in futures.enumerated() { @@ -1101,33 +1115,14 @@ extension EventLoopFuture { let result = future._value { // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. - switch result { - case .success(let result): - onResult(index, result) - remainingCount -= 1 - - if remainingCount == 0 { - promise.succeed(()) - } - - case .failure(let error): - promise.fail(error) + processResult(index, result) + if case .failure = result { return // Once the promise is failed, future results do not need to be processed. } } else { future.hop(to: eventLoop) .whenComplete { result in - switch result { - case .success(let result): - onResult(index, result) - remainingCount -= 1 - - if remainingCount == 0 { - promise.succeed(()) - } - - case .failure(let error): promise.fail(error) - } + processResult(index, result) } } } From fb21584e523c3c2041f83799f291c39b59b1b7c1 Mon Sep 17 00:00:00 2001 From: Daniel Alm Date: Wed, 3 Apr 2019 18:22:49 +0200 Subject: [PATCH 3/4] Add the fast track for `AllComplete` as well. --- Sources/NIO/EventLoopFuture.swift | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/Sources/NIO/EventLoopFuture.swift b/Sources/NIO/EventLoopFuture.swift index 1724181858..8512d84dcf 100644 --- a/Sources/NIO/EventLoopFuture.swift +++ b/Sources/NIO/EventLoopFuture.swift @@ -1121,9 +1121,7 @@ extension EventLoopFuture { } } else { future.hop(to: eventLoop) - .whenComplete { result in - processResult(index, result) - } + .whenComplete { result in processResult(index, result) } } } } @@ -1211,18 +1209,27 @@ extension EventLoopFuture { return } + // Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate. + func processResult(_ index: Int, _ result: Result) { + onResult(index, result) + remainingCount -= 1 + + if remainingCount == 0 { + promise.succeed(()) + } + } // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index // in the "futures" to pass their result to the caller for (index, future) in futures.enumerated() { - future.hop(to: eventLoop) - .whenComplete { result in - onResult(index, result) - remainingCount -= 1 - - guard remainingCount == 0 else { return } - - promise.succeed(()) - } + if future.eventLoop === eventLoop, + let result = future._value { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~30% performance improvement in the case of large arrays where all elements are already fulfilled. + processResult(index, result) + } else { + future.hop(to: eventLoop) + .whenComplete { result in processResult(index, result) } + } } } } From 23a111489e0590d64fcec05fce593b62567580ca Mon Sep 17 00:00:00 2001 From: Daniel Alm Date: Thu, 4 Apr 2019 10:40:35 +0200 Subject: [PATCH 4/4] Use `future.inEventLoop` instead of pointer comparison. --- Sources/NIO/EventLoopFuture.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/NIO/EventLoopFuture.swift b/Sources/NIO/EventLoopFuture.swift index 8512d84dcf..a3c17477f4 100644 --- a/Sources/NIO/EventLoopFuture.swift +++ b/Sources/NIO/EventLoopFuture.swift @@ -1111,7 +1111,7 @@ extension EventLoopFuture { // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index // in the "futures" to pass their result to the caller for (index, future) in futures.enumerated() { - if future.eventLoop === eventLoop, + if future.eventLoop.inEventLoop, let result = future._value { // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. @@ -1221,7 +1221,7 @@ extension EventLoopFuture { // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index // in the "futures" to pass their result to the caller for (index, future) in futures.enumerated() { - if future.eventLoop === eventLoop, + if future.eventLoop.inEventLoop, let result = future._value { // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a // ~30% performance improvement in the case of large arrays where all elements are already fulfilled.