Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subject equivalent? #176

Open
ursusursus opened this issue Jul 10, 2022 · 19 comments
Open

Subject equivalent? #176

ursusursus opened this issue Jul 10, 2022 · 19 comments
Labels
Future Unscheduled work for future releases

Comments

@ursusursus
Copy link

We need a way to imperatively pipe events into a AsyncSequence same asi Subjects from Combine did, or Kotlin's MutableState/SharedFlow

AsynchChannel feels like a low level primitive to be used for state tracking the way CurrentValueSubject was

@bioche
Copy link

bioche commented Jul 14, 2022

This would be a nice addition as it's a tool vastly used in Rx as well (BehaviorSubjects, PublishSubjects, BehaviorRelays, PublishRelays etc...) I guess for the time being we could build one using continuations with something in this flavour.

class BehaviorRelay<V: Sendable> {
    
    init(_ initialValue: V) {
        self.currentValue = initialValue
        self.continuations = []
    }
    
    var currentValue: V
    var continuations: [AsyncStream<V>.Continuation]
    
    let lock = NSLock()
    
    func accept(newValue: V) {
        lock.lock()
        defer { lock.unlock() }
        currentValue = newValue
        continuations.forEach { continuation in
            continuation.yield(newValue)
        }
    }
    
    func makeStream() -> AsyncStream<V> {
        AsyncStream { continuation in
            lock.lock()
            defer { lock.unlock() }
            continuation.yield(currentValue)
            continuations.append(continuation)
        }
    }
}

@ursusursus
Copy link
Author

If the Relay itself would be a asyncsequence, would be great

@bioche
Copy link

bioche commented Oct 9, 2022

Any news on integrating a subject equivalent in the lib at this time ? @phausler maybe ^^

@phausler
Copy link
Member

phausler commented Oct 9, 2022

AsyncChannel serves this purpose to some extent (it is not a current value but a pass through subject style behavior with back pressure)

@FranzBusch
Copy link
Member

The thing we are still missing is a multicast or share algorithm. Right now the focus, as @phausler pointed out in some other issues, is to the get the set of algos that are in here in a stable state and through evolution.

rustle added a commit to rustle/swift-async-algorithms that referenced this issue Oct 10, 2022
@rustle
Copy link
Contributor

rustle commented Oct 10, 2022

I put up a draft PR #208 and am hoping to fill out all the placeholders with a more polished version of the approach I'm using here where I am wrapping an AsyncStream and hosting it in a Task to ensure sequencing (relative to caller) https://github.com/rustle/TaskHostedAsyncSequence/tree/main/Sources/TaskHostedAsyncSequence. Would love to collaborate with any of y'all on this.

rustle added a commit to rustle/swift-async-algorithms that referenced this issue Oct 11, 2022
@rustle
Copy link
Contributor

rustle commented Oct 11, 2022

Pushed up a working (so far) AsyncSubject. I'll work on AsyncThrowingSubject next. Then I'll flesh out tests for both.

@twittemb
Copy link
Contributor

Hi @rustle

I allow my self to post a comment here since I’ve made the same kind of implementation in a pitch on the forum -> https://forums.swift.org/t/pitch-async-buffered-channel/59854

The outcome seemed to be that it was too close to AsyncStream and that having a factory function like https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Creators/AsyncStream%2BPipe.swift should be enough (and could be pitched to the standard library).

@rustle
Copy link
Contributor

rustle commented Oct 11, 2022

Hi @rustle

I allow my self to post a comment here since I’ve made the same kind of implementation in a pitch on the forum -> https://forums.swift.org/t/pitch-async-buffered-channel/59854

The outcome seemed to be that it was too close to AsyncStream and that having a factory function like https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Creators/AsyncStream%2BPipe.swift should be enough (and could be pitched to the standard library).

Cool. I've seen a few versions floating around and will definitely check out yours.

rustle added a commit to rustle/swift-async-algorithms that referenced this issue Oct 13, 2022
@DagAgren
Copy link

I would warmly recommend considering adopting the semantics of ReactiveSwift's Property/MutableProperty here, which I found vastly superior to anything I've seen in Rx or Combine.

Basically, a Property has a value, which in the case of MutableProperty can also be explicitly set. However, Property itself offers all the stream operators like map and combineLatest and so on, and those return a new Property.

This means that you can do things like:

let a = MutableProperty(0)
let b = MutableProperty(0)
let c: Property<Int> = Property.combineLatest(a, b).map { $0 + $1 }

print("\(c.value)") // Prints 0.

a.value = 1
b.value = 2

print("\(c.value)") // Prints 3.

While also allowing you to stream the values of all of a, b and c as they update. This allows you to mix and match reactive and imperative programming as you prefer, which is massively convenient in lots of situations.

@malhal
Copy link

malhal commented Dec 16, 2022

I've been experimenting with this AsyncSubject from ConcurrencyPlus, which essentially is a convenience wrapper for this annoyance:

var continuation: AsyncStream<Int>.Continuation!
let stream = AsyncStream<Int> {
    continuation = $0
}

When using it I've noticed a difference in behaviour in combineLatest compared to Combine's and currently debugging where the problem lies. The problem is if combining 2 streams and multiple values are sent to the first stream, when the second stream receives its first value the for await loops for all the previous first stream values instead of just once for the latest pair.

Edit: its because the linked AsyncSubject uses AsyncStream that buffers, even setting .bufferingNewest(0) or .bufferingOldest(0) didn't fix it. AsyncChannel also buffers. I think I need something equivalent to Combine's PassthroughSubject. Edit: I was wrong, I don't, using AsyncStream requires designing pipelines backwards compared to Combine.

@justin-foreflight
Copy link

justin-foreflight commented Feb 23, 2023

Here is my attempt at an AsyncCurrentValueSubject loosely based on code found in this repository:

public final class AsyncCurrentValueSubject<Element: Sendable>: AsyncSequence, Sendable {
    // MARK: - AsyncSequence
    
    public struct Iterator: AsyncIteratorProtocol, @unchecked Sendable {
        private let id: UInt64
        private let subject: AsyncCurrentValueSubject<Element>
        private var finished: Bool = false
        
        fileprivate init(id: UInt64, subject: AsyncCurrentValueSubject<Element>) {
            self.id = id
            self.subject = subject
        }
        
        public mutating func next() async -> Element? {
            if finished {
                return nil
            }
            
            guard let element = await subject.next(id: id) else {
                finished = true
                return nil
            }
            
            return element
        }
    }
    
    public func makeAsyncIterator() -> Iterator {
        return Iterator(id: generateId(), subject: self)
    }
    
    
    // MARK: - Public interface
    
    public init(_ element: Element) {
        self.state = .init(.iterating(element: element, updated: [], suspended: [:], cancelled: []))
    }
    
    public func send(_ element: Element) {
        for continuation in stateMachineSend(element: element) {
            continuation.resume(returning: element)
        }
    }
    
    public func finish() {
        for continuation in stateMachineFinish() {
            continuation.resume(returning: nil)
        }
    }
    
    
    // MARK: - Implementation details
    
    private let ids = ManagedCriticalState<UInt64>(0)
    
    private func generateId() -> UInt64 {
        return ids.withCriticalRegion { nextId in
            defer { nextId &+= 1 }
            return nextId
        }
    }
    
    fileprivate enum State {
        case finished
        case iterating(element: Element, updated: Set<UInt64>, suspended: [UInt64 : UnsafeContinuation<Element?, Never>], cancelled: Set<UInt64>)
    }
    
    private let state: ManagedCriticalState<State>
    
    private func next(id: UInt64) async -> Element? {
        let (shouldReturn, element) = stateMachineNextImmediate(id: id)
        
        if shouldReturn {
            return element
        }
        
        return await withTaskCancellationHandler {
            await withUnsafeContinuation { continuation in
                let (continuation, element) = stateMachineNextSuspended(id: id, continuation: continuation)
                continuation?.resume(returning: element)
            }
        } onCancel: {
            cancel(id: id)
        }
    }
    
    private func cancel(id: UInt64) {
        let continuation = stateMachineCancel(id: id)
        continuation?.resume(returning: nil)
    }
    
    private func stateMachineSend(element: Element) -> [UnsafeContinuation<Element?, Never>] {
        return state.withCriticalRegion { state -> [UnsafeContinuation<Element?, Never>] in
            switch state {
            case .finished:
                return []
                
            case .iterating(_, _, var suspended, let cancelled):
                let suspendedIds = Set(suspended.keys)
                let suspendedContinuations = Array(suspended.values)
                suspended.removeAll()
                
                state = .iterating(element: element, updated: suspendedIds, suspended: suspended, cancelled: cancelled)
                
                return suspendedContinuations
            }
        }
    }
    
    private func stateMachineFinish() -> [UnsafeContinuation<Element?, Never>] {
        return state.withCriticalRegion { state -> [UnsafeContinuation<Element?, Never>] in
            switch state {
            case .finished:
                return []
                
            case .iterating(_, _, let suspended, _):
                let suspendedContinuations = Array(suspended.values)
                
                state = .finished
                
                return suspendedContinuations
            }
        }
    }
    
    private func stateMachineNextImmediate(id: UInt64) -> (shouldReturn: Bool, element: Element?) {
        return state.withCriticalRegion { state -> (Bool, Element?) in
            switch state {
            case .finished:
                return (true, nil)
                
            case .iterating(let element, var updated, let suspended, var cancelled):
                precondition(suspended[id] == nil)
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)
                
                if let _ = cancelled.remove(id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (true, nil)
                }
                else if updated.contains(id) {
                    return (false, nil)
                }
                else {
                    updated.insert(id)
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (true, element)
                }
            }
        }
    }
    
    private func stateMachineNextSuspended(id: UInt64, continuation: UnsafeContinuation<Element?, Never>) -> (UnsafeContinuation<Element?, Never>?, Element?) {
        return state.withCriticalRegion { state -> (UnsafeContinuation<Element?, Never>?, Element?) in
            switch state {
            case .finished:
                return (continuation, nil)
                
            case .iterating(let element, var updated, var suspended, var cancelled):
                precondition(suspended[id] == nil)
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)
                
                if let _ = cancelled.remove(id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (continuation, nil)
                }
                else if let _ = updated.remove(id) {
                    suspended[id] = continuation
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (nil, nil)
                }
                else {
                    updated.insert(id)
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (continuation, element)
                }
            }
        }
    }
    
    private func stateMachineCancel(id: UInt64) -> UnsafeContinuation<Element?, Never>? {
        return state.withCriticalRegion { state -> UnsafeContinuation<Element?, Never>? in
            switch state {
            case .finished:
                // finished before cancelled
                return nil
                
            case .iterating(let element, var updated, var suspended, var cancelled):
                precondition(!cancelled.contains(id))
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)

                if let _ = updated.remove(id) {
                    cancelled.insert(id)
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return nil
                }
                else if let continuation = suspended.removeValue(forKey: id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return continuation
                }
                else {
                    cancelled.insert(id)
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return nil
                }
            }
        }
    }
}

@FranzBusch FranzBusch added the Future Unscheduled work for future releases label Sep 21, 2023
@ingun37
Copy link

ingun37 commented Dec 12, 2023

I'm curious. Is async-algorithms is a replacement of Combine as reactive programming? Is that why Subject equivalent is needed while Combine already provides them? Or is it unnecessary as concurrency programming library because Subject is concept of reactive programming?

@malhal
Copy link

malhal commented Dec 12, 2023

After using it for a while I've learned it's more like the opposite of Combine. Rather than receive publishers from different places and combine them, instead you just write the code procedurally like normal.

@FranzBusch
Copy link
Member

AsyncSequence the primary building block of this package is using a pull based eventing approach whereas Combine uses a push based model. The other difference here is that AsyncSequence are a language level feature and have first class support via for await in. On the larger topic of Subject like types, there is definitely room for more algorithms in this library to make it possible to model multicast AsyncSequence. In fact, there have been a few different test in various PRs already though the pull based model makes this slightly more interesting to implement and there are a few open implementation questions.

@phausler
Copy link
Member

phausler commented Dec 12, 2023

Combine uses a push based model.

No; this is not true - Combine uses a demand based model; everything is pull based.

@FranzBusch
Copy link
Member

Combine uses a push based model.

No; this is not true - Combine uses a demand based model; everything is pull based.

I was talking about the mechanics how elements are delivered. You are right that demand is pull based in the end.

@realityworks
Copy link

I'm curious. Is async-algorithms is a replacement of Combine as reactive programming? Is that why Subject equivalent is needed while Combine already provides them? Or is it unnecessary as concurrency programming library because Subject is concept of reactive programming?

Apple decided to break all of the nice combine features with their @observable macro.

So now if you want to design a viewModel and pipe values, you need to use the old process of inheriting classes with @observableobject.

The problem is that, using the @observable macro, you cannot do data bindings. Which makes life a complex hell when working with state to view bindings from external sources.

**WORKS **

class AClass: ObservableObject {
...
@Published var aVariable: Bool
...
aPublisher
  .assign(to: &$aVariable)
}

DOES NOT WORK

@Observable class AClass {
...
var aVariable: Bool
...
aPublisher
  .assign(to: &$aVariable)
}

Are there plans to get this Subject equivalent to work?

@jz709u
Copy link

jz709u commented Nov 5, 2024

Any progress on this feature?

Because AsyncChannel doesn't support multiple subscribers.

  func test_multiple_consumers() async {
    let event: AsyncChannel<Bool> = .init()

    let expectation1 = XCTestExpectation(description: "first result")
    let expectation2 = XCTestExpectation(description: "first result")

    Task {
      var results = [Bool]()
      for await result in event {
        results += [result]
        print("results1 \(results)")
        if results == [true, true, false, true] {
          expectation1.fulfill()
        }
      }
    }

    Task {
      var results = [Bool]()
      for await result in event {
        results += [result]
        print("results2 \(results)")
        if results == [true, true, false, true] {
          expectation2.fulfill()
        }
      }
    }
    Task {
      await event.send(true)
      await event.send(true)
      await event.send(false)
      await event.send(true)
    }
    wait(for: [expectation1, expectation2], timeout: 2)
  }

output

results1 [true]
results2 [true]
results1 [true, false]
results2 [true, true]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Future Unscheduled work for future releases
Projects
None yet
Development

No branches or pull requests

13 participants