Skip to content

[WIP] Draft pitch for share #357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Conversation

phausler
Copy link
Member

No description provided.


AsyncAlgorithms will introduce a new extension function on AsyncSequence that will provide a shareable asynchronous sequence that will produce the same values upon iteration from multiple instances of it's AsyncIterator. Those iterations can take place in multiple isolations.

When values from a differing isolation cannot be coalesced, the two options available are either awaiting (an exertion of back-pressure across the sequences) or buffering (an internal back-pressure to a buffer). Replaying the values from the beginning of the creation of the sequence is a distinctly different behavior that should be considered a different use case. This then leaves the behavioral characteristic of this particular operation of share as; sharing a buffer of values started from the initialization of a new iteration of the sequence. Control over that buffer should then have options to determine the behavior, similar to how AsyncStream allows that control. It should have options to be unbounded, buffering the oldest count of elements, or buffering the newest count of elements.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should expose both options as a configuration to share. So a user can choose to either exert back pressure or buffer depending on their use-case.


```swift
/// A strategy that handles exhaustion of a buffer’s capacity.
public enum BufferingPolicy: Sendable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extensible enums are not a feature for packages so this needs to become a struct with some statics instead similar to how we have done it in other algorithms such as .buffer in this repo. Additionally, there is little value here for a user to switch over this enum.


A new extension will be added to return a concrete type representing the share algorithm. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate.

A new AsyncSequence type will be introduced that is explicitly marked as `Sendable`. This annotation identifies to the developer that this sequence can be shared and stored. Because the type is intended to be stored it cannot be returned by the extension as a `some AsyncSequence<Element, Failure> & Sendable` since that cannot be assigned to a stored property. Additionally the type of `AsyncShareSequence`, since indented to be stored, will act as a quasi erasing-barrier to the type information of previous sequences in the chain of algorithms in that it will only hold the generic information of the `Element` and `Failure` as part of it's public interface and not the "Base" asynchronous sequence it was created from.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to erase the base async sequence here? This is different than other algorithms and I'm sure this is going to have performance implications since we most likely need to box the base async sequence in an existential.

I think using opaque return types is the right solution here. Users can store that asynchronous sequence by just putting it behind an any AsyncSequence<Element, Failure> themselves. This leaves the choice to the user instead of forcing the existential on everyone.


Upon creation of the `Iterator` via `makeAsyncIterator` a new "side" will be constructed to identify the specific iterator interacting with the shared iteration. Then when next is invoked is where the first actual action takes place.

The next method will first checkout from a critical region the underlying AsyncIterator from the base. If that is successful (i.e. no other iteration sides have already checked it out) then it will invoke the next method of that iterator (forwarding in the actor isolation). If an element is produced then it enqueues the element to the shared buffer, checks in the iterator, adjusts the index in the buffer, and finds all pending continuations all in a shared critical region by a mutex. Then those continuations will be resumed with the given element.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe. In particular this section:

invoke the next method of that iterator (forwarding in the actor isolation)

Async iterators are generally not-Sendable and you are proposing here to invoke the next method from potentially different isolation domains. While not concurrently you are proposing to do it sequentially. This might break some internal invariants of the base iterator. Furthermore, calling next directly from one of the iterators of share means that if that next call is canceled the base async sequence is cancelled.

Instead I think we need to take a similar approach as we did in merge which is spawning an unstructured task on the first call to next and that task is going to be the only task to iterate the base asynchronous sequence. This upholds the Sendable constraints of the base and it correctly shields the base being cancelled by any of our share consumers.


Then all sides are "drained" such that continuations are placed into the shared state and resumed when an element is available for that position.

Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the reason why we need to provide the alternative variant as well that exerts back-pressure by the slowest consumer i.e. next on the base is only called once all current active iterators of share have consumed the last element. While that is most likely slower it guarantees the user that every iterator receives every element. On the other hand, with the currently proposed only buffer based solution there is no good way to set the size of the buffer that guarantees that no elements will ever be dropped.


## Alternatives considered

[^BufferingPolicy] It has been considered that this particular policy would be nested inside the `AsyncShareSequence` type. However since this seems to be something that will be useful for other types it makes sense to expose it as a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW we already have a type that is pretty similar called AsyncBufferSequencePolicy. IMO we should stay somewhat consistent across the package.

init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) {
self.iteratingTask = .pending(base)
switch bufferingPolicy.policy {
case .bounded: self.storagePolicy = .unbounded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this mapping seems rather counter intuitive

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea... when the buffering policy is set to bounded it means that the storage itself is unbounded but the limit is the bound; previously I split the external parameter of the policy out and this is a consequence of using the existing AsyncBufferSequencePolicy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps deserves a comment explaining why

let result: Result<Element?, Failure> = await withTaskCancellationHandler {
await withCheckedContinuation { continuation in
let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> (Result<Element?, Failure>?, CheckedContinuation<Bool, Never>?, CheckedContinuation<Void, Never>?, Bool) in
let side = state.sides[id]!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the cancel handler races with this and wins will this crash?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I will have to dig into how to address that - good catch btw; I don't think it is impossible to address, just an oversight on my part

Co-authored-by: Jamie <2119834+jamieQ@users.noreply.github.com>
state.iteratingTask = .running(task)
}
}
let result: Result<Element?, Failure> = await withTaskCancellationHandler {
Copy link
Contributor

@jamieQ jamieQ Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i haven't actually tested to verify, but i suspect the cancel handler body is currently subject to this executor hopping issue if the isolated actor parameter isn't captured in the closure. i'm not sure it would be a correctness issue either way, but presumably the isolation should be forwarded so there's no need to risk hopping for no real reason – potentially just as a defensive measure to reduce the risk of surprising behavior.

import Synchronization

@available(macOS 26.0, *)
extension AsyncSequence where Element: Sendable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the Base itself need to be Sendable here since we move it into an unstructured task?

@available(macOS 26.0, *)
extension AsyncSequence where Element: Sendable {
@available(macOS 26.0, *) // TODO: fix the availability for this to be defined as @available(AsyncAlgorithms 1.1, *)
public func share(bufferingPolicy: AsyncBufferSequencePolicy = .unbounded) -> some AsyncSequence<Element, Failure> & Sendable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good nor safe default for all of Swift's use-cases. I would suggest we just always force users to explicitly pick a policy.

Suggested change
public func share(bufferingPolicy: AsyncBufferSequencePolicy = .unbounded) -> some AsyncSequence<Element, Failure> & Sendable {
public func share(bufferingPolicy: AsyncBufferSequencePolicy) -> some AsyncSequence<Element, Failure> & Sendable {

}
}

final class Iteration: Sendable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try to use the pattern established in merge, zip and AsyncChannel to write an explicit state machine with distinct methods and actions. This would make reviewing this code significantly easier.

final class Iteration: Sendable {
// this is the swapped state of transferring the base to the iterating task
// it does send the Base... but only one transfer
enum IteratingTask: @unchecked Sendable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes to my above comment. This @unchecked Sendable conformance is probably needed due to Base not being Sendable. In general, we should not need any unchecked conformances.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants