Skip to content

Commit

Permalink
Implement convenience factory methods for Async[Throwing]Stream
Browse files Browse the repository at this point in the history
This is the implementation for swiftlang/swift-evolution#1824
  • Loading branch information
FranzBusch committed Mar 3, 2023
1 parent 814488f commit 26a089e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 14 deletions.
20 changes: 20 additions & 0 deletions stdlib/public/Concurrency/AsyncStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,26 @@ extension AsyncStream.Continuation {
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncStream {
/// Initializes a new ``AsyncStream`` and an ``AsyncStream/Continuation``.
///
/// - Parameters:
/// - elementType: The element type of the stream.
/// - limit: The buffering policy that the stream should use.
/// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the
/// producer while the stream should be passed to the consumer.
@backDeployed(before: SwiftStdlib 5.9)
public static func makeStream(
of elementType: Element.Type = Element.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
var continuation: AsyncStream<Element>.Continuation!
let stream = AsyncStream<Element>(bufferingPolicy: limit) { continuation = $0 }
return (stream: stream, continuation: continuation!)
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncStream: @unchecked Sendable where Element: Sendable { }
#else
Expand Down
22 changes: 22 additions & 0 deletions stdlib/public/Concurrency/AsyncThrowingStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,28 @@ extension AsyncThrowingStream.Continuation {
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncThrowingStream {
/// Initializes a new ``AsyncThrowingStream`` and an ``AsyncThrowingStream/Continuation``.
///
/// - Parameters:
/// - elementType: The element type of the stream.
/// - failureType: The failure type of the stream.
/// - limit: The buffering policy that the stream should use.
/// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the
/// producer while the stream should be passed to the consumer.
@backDeployed(before: SwiftStdlib 5.9)
public static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
return (stream: stream, continuation: continuation!)
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { }
#else
Expand Down
44 changes: 30 additions & 14 deletions test/Concurrency/Runtime/async_stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
// rdar://78109470
// UNSUPPORTED: back_deployment_runtime

// Race condition
// REQUIRES: rdar78033828

import _Concurrency
import StdlibUnittest
import Dispatch

struct SomeError: Error, Equatable {
var value = Int.random(in: 0..<100)
Expand All @@ -27,14 +23,34 @@ var tests = TestSuite("AsyncStream")
var fulfilled = false
}

tests.test("factory method") {
let (stream, continuation) = AsyncStream.makeStream(of: String.self)
continuation.yield("hello")

var iterator = stream.makeAsyncIterator()
expectEqual(await iterator.next(), "hello")
}

tests.test("throwing factory method") {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: String.self, throwing: Error.self)
continuation.yield("hello")

var iterator = stream.makeAsyncIterator()
do {
expectEqual(try await iterator.next(), "hello")
} catch {
expectUnreachable("unexpected error thrown")
}
}

tests.test("yield with no awaiting next") {
let series = AsyncStream(String.self) { continuation in
_ = AsyncStream(String.self) { continuation in
continuation.yield("hello")
}
}

tests.test("yield with no awaiting next throwing") {
let series = AsyncThrowingStream(String.self) { continuation in
_ = AsyncThrowingStream(String.self) { continuation in
continuation.yield("hello")
}
}
Expand Down Expand Up @@ -122,7 +138,7 @@ var tests = TestSuite("AsyncStream")
do {
expectEqual(try await iterator.next(), "hello")
expectEqual(try await iterator.next(), "world")
try await iterator.next()
_ = try await iterator.next()
expectUnreachable("expected thrown error")
} catch {
if let failure = error as? SomeError {
Expand All @@ -134,15 +150,15 @@ var tests = TestSuite("AsyncStream")
}

tests.test("yield with no awaiting next detached") {
let series = AsyncStream(String.self) { continuation in
_ = AsyncStream(String.self) { continuation in
detach {
continuation.yield("hello")
}
}
}

tests.test("yield with no awaiting next detached throwing") {
let series = AsyncThrowingStream(String.self) { continuation in
_ = AsyncThrowingStream(String.self) { continuation in
detach {
continuation.yield("hello")
}
Expand Down Expand Up @@ -246,7 +262,7 @@ var tests = TestSuite("AsyncStream")
do {
expectEqual(try await iterator.next(), "hello")
expectEqual(try await iterator.next(), "world")
try await iterator.next()
_ = try await iterator.next()
expectUnreachable("expected thrown error")
} catch {
if let failure = error as? SomeError {
Expand Down Expand Up @@ -337,7 +353,7 @@ var tests = TestSuite("AsyncStream")
let expectation = Expectation()

func scopedLifetime(_ expectation: Expectation) {
let series = AsyncStream(String.self) { continuation in
_ = AsyncStream(String.self) { continuation in
continuation.onTermination = { @Sendable _ in expectation.fulfilled = true }
}
}
Expand All @@ -351,7 +367,7 @@ var tests = TestSuite("AsyncStream")
let expectation = Expectation()

func scopedLifetime(_ expectation: Expectation) {
let series = AsyncStream(String.self) { continuation in
_ = AsyncStream(String.self) { continuation in
continuation.onTermination = { @Sendable _ in expectation.fulfilled = true }
continuation.finish()
}
Expand All @@ -366,7 +382,7 @@ var tests = TestSuite("AsyncStream")
let expectation = Expectation()

func scopedLifetime(_ expectation: Expectation) {
let series = AsyncStream(String.self) { continuation in
_ = AsyncStream(String.self) { continuation in
continuation.onTermination = { @Sendable terminal in
switch terminal {
case .cancelled:
Expand All @@ -386,7 +402,7 @@ var tests = TestSuite("AsyncStream")
let expectation = Expectation()

func scopedLifetime(_ expectation: Expectation) {
let series = AsyncThrowingStream(String.self) { continuation in
_ = AsyncThrowingStream(String.self) { continuation in
continuation.onTermination = { @Sendable terminal in
switch terminal {
case .cancelled:
Expand Down

0 comments on commit 26a089e

Please sign in to comment.