diff --git a/Sources/NIO/EventLoopFuture.swift b/Sources/NIO/EventLoopFuture.swift index 0377247ce4..9af8254945 100644 --- a/Sources/NIO/EventLoopFuture.swift +++ b/Sources/NIO/EventLoopFuture.swift @@ -1032,7 +1032,17 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on. /// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed. public static func andAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture { - return .reduce((), futures, on: eventLoop) { (_: (), _: Value) in } + let promise = eventLoop.makePromise(of: Void.self) + + if eventLoop.inEventLoop { + self._reduceSuccesses0(promise, futures, eventLoop, onResult: { _, _ in }) + } else { + eventLoop.execute { + self._reduceSuccesses0(promise, futures, eventLoop, onResult: { _, _ in }) + } + } + + return promise.futureResult } /// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed. @@ -1044,7 +1054,83 @@ extension EventLoopFuture { /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures. public static func whenAllSucceed(_ futures: [EventLoopFuture], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> { - return .reduce(into: [], futures, on: eventLoop) { (results, value) in results.append(value) } + let promise = eventLoop.makePromise(of: Void.self) + + var results: [Value?] = .init(repeating: nil, count: futures.count) + let callback = { (index: Int, result: Value) in + results[index] = result + } + + if eventLoop.inEventLoop { + self._reduceSuccesses0(promise, futures, eventLoop, onResult: callback) + } else { + eventLoop.execute { + self._reduceSuccesses0(promise, futures, eventLoop, onResult: callback) + } + } + + return promise.futureResult.map { + // verify that all operations have been completed + assert(!results.contains(where: { $0 == nil })) + return results.map { $0! } + } + } + + /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when + /// they succeed. The `onResult` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have succeed, the provided promise will succeed. + /// Once any future fails, the provided promise will fail. + private static func _reduceSuccesses0(_ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop, + onResult: @escaping (Int, InputValue) -> Void) { + eventLoop.assertInEventLoop() + + var remainingCount = futures.count + + if remainingCount == 0 { + promise.succeed(()) + return + } + + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop === eventLoop, + let result = future._value { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. + switch result { + case .success(let result): + onResult(index, result) + remainingCount -= 1 + + if remainingCount == 0 { + promise.succeed(()) + } + + case .failure(let error): + promise.fail(error) + return // Once the promise is failed, future results do not need to be processed. + } + } else { + future.hop(to: eventLoop) + .whenComplete { result in + switch result { + case .success(let result): + onResult(index, result) + remainingCount -= 1 + + if remainingCount == 0 { + promise.succeed(()) + } + + case .failure(let error): promise.fail(error) + } + } + } + } } } diff --git a/Sources/NIOPerformanceTester/main.swift b/Sources/NIOPerformanceTester/main.swift index e7dd71e496..0443df87cf 100644 --- a/Sources/NIOPerformanceTester/main.swift +++ b/Sources/NIOPerformanceTester/main.swift @@ -614,3 +614,90 @@ measureAndPrint(desc: "http1_10k_reqs_100_conns") { } return reqs.reduce(0, +) / reqsPerConn } + +measureAndPrint(desc: "future_whenallsucceed_100k_immediately_succeeded_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let futures = expected.map { loop.makeSucceededFuture($0) } + let allSucceeded = try! EventLoopFuture.whenAllSucceed(futures, on: loop).wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenallsucceed_100k_immediately_succeeded_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Int]> in + let futures = expected.map { loop.makeSucceededFuture($0) } + return EventLoopFuture.whenAllSucceed(futures, on: loop) + }.wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenallsucceed_100k_deferred_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.makePromise(of: Int.self) } + let allSucceeded = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(index) + } + return try! allSucceeded.wait().count +} + +measureAndPrint(desc: "future_whenallsucceed_100k_deferred_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.makePromise(of: Int.self) } + let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Int]> in + let result = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(index) + } + return result + }.wait() + return allSucceeded.count +} + + +measureAndPrint(desc: "future_whenallcomplete_100k_immediately_succeeded_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let futures = expected.map { loop.makeSucceededFuture($0) } + let allSucceeded = try! EventLoopFuture.whenAllComplete(futures, on: loop).wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenallcomplete_100k_immediately_succeeded_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Result]> in + let futures = expected.map { loop.makeSucceededFuture($0) } + return EventLoopFuture.whenAllComplete(futures, on: loop) + }.wait() + return allSucceeded.count +} + +measureAndPrint(desc: "future_whenallcomplete_100k_deferred_off_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.makePromise(of: Int.self) } + let allSucceeded = EventLoopFuture.whenAllComplete(promises.map { $0.futureResult }, on: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(index) + } + return try! allSucceeded.wait().count +} + +measureAndPrint(desc: "future_whenallcomplete_100k_deferred_on_loop") { + let loop = group.next() + let expected = Array(0..<100_000) + let promises = expected.map { _ in loop.makePromise(of: Int.self) } + let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Result]> in + let result = EventLoopFuture.whenAllComplete(promises.map { $0.futureResult }, on: loop) + for (index, promise) in promises.enumerated() { + promise.succeed(index) + } + return result + }.wait() + return allSucceeded.count +} diff --git a/Tests/NIOTests/EventLoopFutureTest.swift b/Tests/NIOTests/EventLoopFutureTest.swift index b2bea8179e..5a3258b1d6 100644 --- a/Tests/NIOTests/EventLoopFutureTest.swift +++ b/Tests/NIOTests/EventLoopFutureTest.swift @@ -860,6 +860,90 @@ class EventLoopFutureTest : XCTestCase { } } + func testWhenAllSucceedFailsImmediately() { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let promises = [group.next().makePromise(of: Int.self), + group.next().makePromise(of: Int.self)] + let future = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: group.next()) + promises[0].fail(EventLoopFutureTestError.example) + XCTAssertThrowsError(try future.wait()) { error in + XCTAssert(type(of: error) == EventLoopFutureTestError.self) + } + } + + func testWhenAllSucceedResolvesAfterFutures() throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 6) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let promises = (0..<5).map { _ in group.next().makePromise(of: Int.self) } + let futures = promises.map { $0.futureResult } + + var succeeded = false + var completedPromises = false + + let mainFuture = EventLoopFuture.whenAllSucceed(futures, on: group.next()) + mainFuture.whenSuccess { _ in + XCTAssertTrue(completedPromises) + XCTAssertFalse(succeeded) + succeeded = true + } + + // Should be false, as none of the promises have completed yet + XCTAssertFalse(succeeded) + + // complete the first four promises + for (index, promise) in promises.dropLast().enumerated() { + promise.succeed(index) + } + + // Should still be false, as one promise hasn't completed yet + XCTAssertFalse(succeeded) + + // Complete the last promise + completedPromises = true + promises.last!.succeed(4) + + let results = try assertNoThrowWithValue(mainFuture.wait()) + XCTAssertEqual(results, [0, 1, 2, 3, 4]) + } + + func testWhenAllSucceedIsIndependentOfFulfillmentOrder() throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 6) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let expected = Array(0..<1000) + let promises = expected.map { _ in group.next().makePromise(of: Int.self) } + let futures = promises.map { $0.futureResult } + + var succeeded = false + var completedPromises = false + + let mainFuture = EventLoopFuture.whenAllSucceed(futures, on: group.next()) + mainFuture.whenSuccess { _ in + XCTAssertTrue(completedPromises) + XCTAssertFalse(succeeded) + succeeded = true + } + + for index in expected.reversed() { + if index == 0 { + completedPromises = true + } + promises[index].succeed(index) + } + + let results = try assertNoThrowWithValue(mainFuture.wait()) + XCTAssertEqual(results, expected) + } + func testWhenAllCompleteResultsWithFailuresStillSucceed() { let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) defer {