diff --git a/stdlib/public/Concurrency/AsyncStream.swift b/stdlib/public/Concurrency/AsyncStream.swift index 6e65cab5ca724..0687a8a918290 100644 --- a/stdlib/public/Concurrency/AsyncStream.swift +++ b/stdlib/public/Concurrency/AsyncStream.swift @@ -428,6 +428,26 @@ extension AsyncStream.Continuation { } } +@available(SwiftStdlib 5.1, *) +extension AsyncStream { + /// Initializes a new ``AsyncStream`` and an ``AsyncStream/Continuation``. + /// + /// - Parameters: + /// - elementType: The element type of the stream. + /// - limit: The buffering policy that the stream should use. + /// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the + /// producer while the stream should be passed to the consumer. + @backDeployed(before: SwiftStdlib 5.9) + public static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(bufferingPolicy: limit) { continuation = $0 } + return (stream: stream, continuation: continuation!) + } +} + @available(SwiftStdlib 5.1, *) extension AsyncStream: @unchecked Sendable where Element: Sendable { } #else diff --git a/stdlib/public/Concurrency/AsyncThrowingStream.swift b/stdlib/public/Concurrency/AsyncThrowingStream.swift index dfbfea5085790..2d25dbef88328 100644 --- a/stdlib/public/Concurrency/AsyncThrowingStream.swift +++ b/stdlib/public/Concurrency/AsyncThrowingStream.swift @@ -473,6 +473,28 @@ extension AsyncThrowingStream.Continuation { } } +@available(SwiftStdlib 5.1, *) +extension AsyncThrowingStream { + /// Initializes a new ``AsyncThrowingStream`` and an ``AsyncThrowingStream/Continuation``. + /// + /// - Parameters: + /// - elementType: The element type of the stream. + /// - failureType: The failure type of the stream. + /// - limit: The buffering policy that the stream should use. + /// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the + /// producer while the stream should be passed to the consumer. + @backDeployed(before: SwiftStdlib 5.9) + public static func makeStream( + of elementType: Element.Type = Element.self, + throwing failureType: Failure.Type = Failure.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncThrowingStream, continuation: AsyncThrowingStream.Continuation) where Failure == Error { + var continuation: AsyncThrowingStream.Continuation! + let stream = AsyncThrowingStream(bufferingPolicy: limit) { continuation = $0 } + return (stream: stream, continuation: continuation!) + } +} + @available(SwiftStdlib 5.1, *) extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { } #else diff --git a/test/Concurrency/Runtime/async_stream.swift b/test/Concurrency/Runtime/async_stream.swift index ffdc42577dd0a..72e9cc0fed025 100644 --- a/test/Concurrency/Runtime/async_stream.swift +++ b/test/Concurrency/Runtime/async_stream.swift @@ -7,12 +7,8 @@ // rdar://78109470 // UNSUPPORTED: back_deployment_runtime -// Race condition -// REQUIRES: rdar78033828 - import _Concurrency import StdlibUnittest -import Dispatch struct SomeError: Error, Equatable { var value = Int.random(in: 0..<100) @@ -27,14 +23,34 @@ var tests = TestSuite("AsyncStream") var fulfilled = false } + tests.test("factory method") { + let (stream, continuation) = AsyncStream.makeStream(of: String.self) + continuation.yield("hello") + + var iterator = stream.makeAsyncIterator() + expectEqual(await iterator.next(), "hello") + } + + tests.test("throwing factory method") { + let (stream, continuation) = AsyncThrowingStream.makeStream(of: String.self, throwing: Error.self) + continuation.yield("hello") + + var iterator = stream.makeAsyncIterator() + do { + expectEqual(try await iterator.next(), "hello") + } catch { + expectUnreachable("unexpected error thrown") + } + } + tests.test("yield with no awaiting next") { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in continuation.yield("hello") } } tests.test("yield with no awaiting next throwing") { - let series = AsyncThrowingStream(String.self) { continuation in + _ = AsyncThrowingStream(String.self) { continuation in continuation.yield("hello") } } @@ -122,7 +138,7 @@ var tests = TestSuite("AsyncStream") do { expectEqual(try await iterator.next(), "hello") expectEqual(try await iterator.next(), "world") - try await iterator.next() + _ = try await iterator.next() expectUnreachable("expected thrown error") } catch { if let failure = error as? SomeError { @@ -134,7 +150,7 @@ var tests = TestSuite("AsyncStream") } tests.test("yield with no awaiting next detached") { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in detach { continuation.yield("hello") } @@ -142,7 +158,7 @@ var tests = TestSuite("AsyncStream") } tests.test("yield with no awaiting next detached throwing") { - let series = AsyncThrowingStream(String.self) { continuation in + _ = AsyncThrowingStream(String.self) { continuation in detach { continuation.yield("hello") } @@ -246,7 +262,7 @@ var tests = TestSuite("AsyncStream") do { expectEqual(try await iterator.next(), "hello") expectEqual(try await iterator.next(), "world") - try await iterator.next() + _ = try await iterator.next() expectUnreachable("expected thrown error") } catch { if let failure = error as? SomeError { @@ -337,7 +353,7 @@ var tests = TestSuite("AsyncStream") let expectation = Expectation() func scopedLifetime(_ expectation: Expectation) { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in continuation.onTermination = { @Sendable _ in expectation.fulfilled = true } } } @@ -351,7 +367,7 @@ var tests = TestSuite("AsyncStream") let expectation = Expectation() func scopedLifetime(_ expectation: Expectation) { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in continuation.onTermination = { @Sendable _ in expectation.fulfilled = true } continuation.finish() } @@ -366,7 +382,7 @@ var tests = TestSuite("AsyncStream") let expectation = Expectation() func scopedLifetime(_ expectation: Expectation) { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in continuation.onTermination = { @Sendable terminal in switch terminal { case .cancelled: @@ -386,7 +402,7 @@ var tests = TestSuite("AsyncStream") let expectation = Expectation() func scopedLifetime(_ expectation: Expectation) { - let series = AsyncThrowingStream(String.self) { continuation in + _ = AsyncThrowingStream(String.self) { continuation in continuation.onTermination = { @Sendable terminal in switch terminal { case .cancelled: