Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Significantly improve the performance of EventLoopFuture.{and,when}AllSucceed. #943

Merged
merged 5 commits into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 99 additions & 11 deletions Sources/NIO/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,17 @@ extension EventLoopFuture {
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on.
/// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed.
public static func andAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
return .reduce((), futures, on: eventLoop) { (_: (), _: Value) in }
let promise = eventLoop.makePromise(of: Void.self)

if eventLoop.inEventLoop {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in })
} else {
eventLoop.execute {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in })
}
}

return promise.futureResult
}

/// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed.
Expand All @@ -1044,7 +1054,76 @@ extension EventLoopFuture {
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire.
/// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures.
public static func whenAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> {
return .reduce(into: [], futures, on: eventLoop) { (results, value) in results.append(value) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these methods were using .reduce, and I can't help but wonder if perhaps the performance boost could have been done in that method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure reduce and fold have potential for further optimization, but I don't think their performance will come anywhere close to this bespoke implementation. For example, the argument to fold is a closure that returns a future. I.e. we'd need to create at least one extra future (several more with the current implementation of fold, actually) for each element in the array. That alone generates so much overhead that it can't match the performance of bespoke implementations.

There could be a point made about when the performance of a bespoke implementation is needed, however, Vapor & Co. process a ton of already-fulfilled futures where this is significant.

let promise = eventLoop.makePromise(of: Void.self)

var results: [Value?] = .init(repeating: nil, count: futures.count)
let callback = { (index: Int, result: Value) in
results[index] = result
}

if eventLoop.inEventLoop {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback)
} else {
eventLoop.execute {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback)
}
}

return promise.futureResult.map {
// verify that all operations have been completed
assert(!results.contains(where: { $0 == nil }))
return results.map { $0! }
}
}

/// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when
/// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`.
///
/// Once all the futures have succeed, the provided promise will succeed.
/// Once any future fails, the provided promise will fail.
private static func _reduceSuccesses0<InputValue>(_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onValue: @escaping (Int, InputValue) -> Void) {
eventLoop.assertInEventLoop()

var remainingCount = futures.count

if remainingCount == 0 {
promise.succeed(())
return
}

// Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate.
func processResult(_ index: Int, _ result: Result<InputValue, Error>) {
switch result {
case .success(let result):
onValue(index, result)
remainingCount -= 1

if remainingCount == 0 {
promise.succeed(())
}
case .failure(let error):
promise.fail(error)
}
}
// loop through the futures to chain callbacks to execute on the initiating event loop and grab their index
// in the "futures" to pass their result to the caller
for (index, future) in futures.enumerated() {
if future.eventLoop.inEventLoop,
let result = future._value {
// Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a
// ~20% performance improvement in the case of large arrays where all elements are already fulfilled.
processResult(index, result)
if case .failure = result {
return // Once the promise is failed, future results do not need to be processed.
}
} else {
future.hop(to: eventLoop)
.whenComplete { result in processResult(index, result) }
}
}
}
}

Expand Down Expand Up @@ -1130,18 +1209,27 @@ extension EventLoopFuture {
return
}

// Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate.
func processResult(_ index: Int, _ result: Result<InputValue, Error>) {
onResult(index, result)
remainingCount -= 1

if remainingCount == 0 {
promise.succeed(())
}
}
// loop through the futures to chain callbacks to execute on the initiating event loop and grab their index
// in the "futures" to pass their result to the caller
for (index, future) in futures.enumerated() {
future.hop(to: eventLoop)
.whenComplete { result in
onResult(index, result)
remainingCount -= 1

guard remainingCount == 0 else { return }

promise.succeed(())
}
if future.eventLoop.inEventLoop,
let result = future._value {
// Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a
// ~30% performance improvement in the case of large arrays where all elements are already fulfilled.
processResult(index, result)
} else {
future.hop(to: eventLoop)
.whenComplete { result in processResult(index, result) }
}
}
}
}
Expand Down
87 changes: 87 additions & 0 deletions Sources/NIOPerformanceTester/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,90 @@ measureAndPrint(desc: "http1_10k_reqs_100_conns") {
}
return reqs.reduce(0, +) / reqsPerConn
}

measureAndPrint(desc: "future_whenallsucceed_100k_immediately_succeeded_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let futures = expected.map { loop.makeSucceededFuture($0) }
let allSucceeded = try! EventLoopFuture.whenAllSucceed(futures, on: loop).wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenallsucceed_100k_immediately_succeeded_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Int]> in
let futures = expected.map { loop.makeSucceededFuture($0) }
return EventLoopFuture.whenAllSucceed(futures, on: loop)
}.wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenallsucceed_100k_deferred_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.makePromise(of: Int.self) }
let allSucceeded = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(index)
}
return try! allSucceeded.wait().count
}

measureAndPrint(desc: "future_whenallsucceed_100k_deferred_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.makePromise(of: Int.self) }
let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Int]> in
let result = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(index)
}
return result
}.wait()
return allSucceeded.count
}


measureAndPrint(desc: "future_whenallcomplete_100k_immediately_succeeded_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let futures = expected.map { loop.makeSucceededFuture($0) }
let allSucceeded = try! EventLoopFuture.whenAllComplete(futures, on: loop).wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenallcomplete_100k_immediately_succeeded_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Result<Int, Error>]> in
let futures = expected.map { loop.makeSucceededFuture($0) }
return EventLoopFuture.whenAllComplete(futures, on: loop)
}.wait()
return allSucceeded.count
}

measureAndPrint(desc: "future_whenallcomplete_100k_deferred_off_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.makePromise(of: Int.self) }
let allSucceeded = EventLoopFuture.whenAllComplete(promises.map { $0.futureResult }, on: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(index)
}
return try! allSucceeded.wait().count
}

measureAndPrint(desc: "future_whenallcomplete_100k_deferred_on_loop") {
let loop = group.next()
let expected = Array(0..<100_000)
let promises = expected.map { _ in loop.makePromise(of: Int.self) }
let allSucceeded = try! loop.makeSucceededFuture(()).flatMap { _ -> EventLoopFuture<[Result<Int, Error>]> in
let result = EventLoopFuture.whenAllComplete(promises.map { $0.futureResult }, on: loop)
for (index, promise) in promises.enumerated() {
promise.succeed(index)
}
return result
}.wait()
return allSucceeded.count
}
3 changes: 3 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ extension EventLoopFutureTest {
("testLoopHoppingHelperNoHopping", testLoopHoppingHelperNoHopping),
("testFlatMapResultHappyPath", testFlatMapResultHappyPath),
("testFlatMapResultFailurePath", testFlatMapResultFailurePath),
("testWhenAllSucceedFailsImmediately", testWhenAllSucceedFailsImmediately),
("testWhenAllSucceedResolvesAfterFutures", testWhenAllSucceedResolvesAfterFutures),
("testWhenAllSucceedIsIndependentOfFulfillmentOrder", testWhenAllSucceedIsIndependentOfFulfillmentOrder),
("testWhenAllCompleteResultsWithFailuresStillSucceed", testWhenAllCompleteResultsWithFailuresStillSucceed),
("testWhenAllCompleteResults", testWhenAllCompleteResults),
("testWhenAllCompleteResolvesAfterFutures", testWhenAllCompleteResolvesAfterFutures),
Expand Down
84 changes: 84 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,90 @@ class EventLoopFutureTest : XCTestCase {
}
}

func testWhenAllSucceedFailsImmediately() {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let promises = [group.next().makePromise(of: Int.self),
group.next().makePromise(of: Int.self)]
let future = EventLoopFuture.whenAllSucceed(promises.map { $0.futureResult }, on: group.next())
promises[0].fail(EventLoopFutureTestError.example)
XCTAssertThrowsError(try future.wait()) { error in
XCTAssert(type(of: error) == EventLoopFutureTestError.self)
}
}

func testWhenAllSucceedResolvesAfterFutures() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 6)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let promises = (0..<5).map { _ in group.next().makePromise(of: Int.self) }
let futures = promises.map { $0.futureResult }

var succeeded = false
var completedPromises = false

let mainFuture = EventLoopFuture.whenAllSucceed(futures, on: group.next())
mainFuture.whenSuccess { _ in
XCTAssertTrue(completedPromises)
XCTAssertFalse(succeeded)
succeeded = true
}

// Should be false, as none of the promises have completed yet
XCTAssertFalse(succeeded)

// complete the first four promises
for (index, promise) in promises.dropLast().enumerated() {
promise.succeed(index)
}

// Should still be false, as one promise hasn't completed yet
XCTAssertFalse(succeeded)

// Complete the last promise
completedPromises = true
promises.last!.succeed(4)

let results = try assertNoThrowWithValue(mainFuture.wait())
XCTAssertEqual(results, [0, 1, 2, 3, 4])
}

func testWhenAllSucceedIsIndependentOfFulfillmentOrder() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 6)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let expected = Array(0..<1000)
let promises = expected.map { _ in group.next().makePromise(of: Int.self) }
let futures = promises.map { $0.futureResult }

var succeeded = false
var completedPromises = false

let mainFuture = EventLoopFuture.whenAllSucceed(futures, on: group.next())
mainFuture.whenSuccess { _ in
XCTAssertTrue(completedPromises)
XCTAssertFalse(succeeded)
succeeded = true
}

for index in expected.reversed() {
if index == 0 {
completedPromises = true
}
promises[index].succeed(index)
}

let results = try assertNoThrowWithValue(mainFuture.wait())
XCTAssertEqual(results, expected)
}

func testWhenAllCompleteResultsWithFailuresStillSucceed() {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer {
Expand Down