Skip to content

Commit

Permalink
Implement convenience factory methods for Async[Throwing]Stream
Browse files Browse the repository at this point in the history
This is the implementation for swiftlang/swift-evolution#1824
  • Loading branch information
FranzBusch committed Jan 11, 2023
1 parent 93a0a6d commit 2d4155d
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 3 deletions.
45 changes: 45 additions & 0 deletions stdlib/public/Concurrency/AsyncStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ public struct AsyncStream<Element> {
}
}
}

init(storage: _Storage) {
self.context = _Context(storage: storage, produce: storage.next)
}
}

@available(SwiftStdlib 5.1, *)
Expand Down Expand Up @@ -428,6 +432,47 @@ extension AsyncStream.Continuation {
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncStream {
/// Struct for the return type of ``AsyncStream/makeStream(elementType:limit:)``.
///
/// This struct contains two properties:
/// 1. The ``continuation`` which should be retained by the producer and is used
/// to yield new elements to the stream and finish it.
/// 2. The ``stream`` which is the actual ``AsyncStream`` and
/// should be passed to the consumer.
@available(SwiftStdlib 5.1, *)
public struct NewStream {
/// The continuation of the ``AsyncStream`` used to yield and finish.
public let continuation: AsyncStream<Element>.Continuation

/// The stream which should be passed to the consumer.
public let stream: AsyncStream<Element>

public init(stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
self.stream = stream
self.continuation = continuation
}
}

/// Initializes a new ``AsyncStream`` and an ``AsyncStream/Continuation``.
///
/// - Parameters:
/// - elementType: The element type of the stream.
/// - limit: The buffering policy that the stream should use.
/// - Returns: A ``NewStream`` struct which contains the stream and its continuation.
@available(SwiftStdlib 5.1, *)
public static func makeStream(
of elementType: Element.Type = Element.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> NewStream {
let storage: _Storage = .create(limit: limit)
let stream = AsyncStream<Element>(storage: storage)
let continuation = Continuation(storage: storage)
return .init(stream: stream, continuation: continuation)
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncStream: @unchecked Sendable where Element: Sendable { }
#else
Expand Down
47 changes: 47 additions & 0 deletions stdlib/public/Concurrency/AsyncThrowingStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ public struct AsyncThrowingStream<Element, Failure: Error> {
}
}
}

init(storage: _Storage) {
self.context = _Context(storage: storage, produce: storage.next)
}
}

@available(SwiftStdlib 5.1, *)
Expand Down Expand Up @@ -473,6 +477,49 @@ extension AsyncThrowingStream.Continuation {
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncThrowingStream {
/// Struct for the return type of ``AsyncThrowingStream/makeStream(elementType:limit:)``.
///
/// This struct contains two properties:
/// 1. The ``continuation`` which should be retained by the producer and is used
/// to yield new elements to the stream and finish it.
/// 2. The ``stream`` which is the actual ``AsyncThrowingStream`` and
/// should be passed to the consumer.
@available(SwiftStdlib 5.1, *)
public struct NewStream {
/// The continuation of the ``AsyncThrowingStream`` used to yield and finish.
public let continuation: AsyncThrowingStream<Element, Failure>.Continuation

/// The stream which should be passed to the consumer.
public let stream: AsyncThrowingStream<Element, Failure>

public init(stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) {
self.stream = stream
self.continuation = continuation
}
}

/// Initializes a new ``AsyncThrowingStream`` and an ``AsyncThrowingStream/Continuation``.
///
/// - Parameters:
/// - elementType: The element type of the stream.
/// - failureType: The failure type of the stream.
/// - limit: The buffering policy that the stream should use.
/// - Returns: A ``NewStream`` struct which contains the stream and its continuation.
@available(SwiftStdlib 5.1, *)
public static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> NewStream {
let storage: _Storage = .create(limit: limit)
let stream = AsyncThrowingStream<Element, Failure>(storage: storage)
let continuation = Continuation(storage: storage)
return .init(stream: stream, continuation: continuation)
}
}

@available(SwiftStdlib 5.1, *)
extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { }
#else
Expand Down
27 changes: 24 additions & 3 deletions test/Concurrency/Runtime/async_stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
// rdar://78109470
// UNSUPPORTED: back_deployment_runtime

// Race condition
// REQUIRES: rdar78033828

import _Concurrency
import StdlibUnittest
import Dispatch
Expand All @@ -27,6 +24,30 @@ var tests = TestSuite("AsyncStream")
var fulfilled = false
}

tests.test("factory method") {
let newStream = AsyncStream.makeStream(of: String.self)
newStream.continuation.yield("hello")

var iterator = newStream.stream.makeAsyncIterator()
do {
expectEqual(try await iterator.next(), "hello")
} catch {
expectUnreachable("unexpected error thrown")
}
}

tests.test("throwing factory method") {
let newStream = AsyncThrowingStream.makeStream(of: String.self, throwing: Error.self)
newStream.continuation.yield("hello")

var iterator = newStream.stream.makeAsyncIterator()
do {
expectEqual(try await iterator.next(), "hello")
} catch {
expectUnreachable("unexpected error thrown")
}
}

tests.test("yield with no awaiting next") {
let series = AsyncStream(String.self) { continuation in
continuation.yield("hello")
Expand Down

0 comments on commit 2d4155d

Please sign in to comment.