From d1de051dd1652490e75b4309be287d2dd9a29c80 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 20 Feb 2024 20:26:10 -0800 Subject: [PATCH 1/3] add all the stream all messaging methods --- Sources/XMTPiOS/Conversations.swift | 95 ++++++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 2 deletions(-) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index 371323af..862877f7 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -239,7 +239,7 @@ public actor Conversations { return messages } - public func streamAllMessages() async throws -> AsyncThrowingStream { + func streamAllV2Messages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { while true { @@ -278,8 +278,99 @@ public actor Conversations { } } } + + public func streamAllGroupMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task.detached { + do { + self.streamHolder.stream = try await self.client.v3Client?.conversations().streamAllMessages( + messageCallback: MessageCallback(client: self.client) { message in + do { + continuation.yield(try message.fromFFI(client: self.client)) + } catch { + print("Error onMessage \(error)") + } + } + ) + } catch { + print("STREAM ERR: \(error)") + } + } + } + } + + public func streamAllMessages(includeGroups: Bool = false) async throws -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + do { + var iterator = stream.makeAsyncIterator() + while let element = try await iterator.next() { + continuation.yield(element) + } + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + } + + Task { + await forwardStreamToMerged(stream: try streamAllV2Messages()) + } + if (includeGroups) { + Task { + await forwardStreamToMerged(stream: streamAllGroupMessages()) + } + } + } + } + + public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task.detached { + do { + self.streamHolder.stream = try await self.client.v3Client?.conversations().streamAllMessages( + messageCallback: MessageCallback(client: self.client) { message in + do { + continuation.yield(try message.fromFFIDecrypted(client: self.client)) + } catch { + print("Error onMessage \(error)") + } + } + ) + } catch { + print("STREAM ERR: \(error)") + } + } + } + } + + public func streamAllDecryptedMessages(includeGroups: Bool = false) -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + do { + var iterator = stream.makeAsyncIterator() + while let element = try await iterator.next() { + continuation.yield(element) + } + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + } + + Task { + await forwardStreamToMerged(stream: try streamAllV2DecryptedMessages()) + } + if (includeGroups) { + Task { + await forwardStreamToMerged(stream: streamAllGroupDecryptedMessages()) + } + } + } + } + - public func streamAllDecryptedMessages() async throws -> AsyncThrowingStream { + func streamAllV2DecryptedMessages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { while true { From 128e258437e0636d88d0b0f293d070eeb980db81 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 20 Feb 2024 20:27:14 -0800 Subject: [PATCH 2/3] bump the pod spec --- XMTP.podspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/XMTP.podspec b/XMTP.podspec index e1b4095f..609b0dd7 100644 --- a/XMTP.podspec +++ b/XMTP.podspec @@ -16,7 +16,7 @@ Pod::Spec.new do |spec| # spec.name = "XMTP" - spec.version = "0.8.9" + spec.version = "0.8.10" spec.summary = "XMTP SDK Cocoapod" # This description is used to generate tags and improve search results. From 9c95d1637e2aa20092d3194975aacdb1d88dae17 Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Tue, 20 Feb 2024 20:39:56 -0800 Subject: [PATCH 3/3] add tests for all the new streaming methods --- Sources/XMTPiOS/Conversations.swift | 4 +- Tests/XMTPTests/GroupTests.swift | 68 +++++++++++++++++++++++++++-- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index 862877f7..db751a24 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -281,7 +281,7 @@ public actor Conversations { public func streamAllGroupMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in - Task.detached { + Task { do { self.streamHolder.stream = try await self.client.v3Client?.conversations().streamAllMessages( messageCallback: MessageCallback(client: self.client) { message in @@ -326,7 +326,7 @@ public actor Conversations { public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in - Task.detached { + Task { do { self.streamHolder.stream = try await self.client.v3Client?.conversations().streamAllMessages( messageCallback: MessageCallback(client: self.client) { message in diff --git a/Tests/XMTPTests/GroupTests.swift b/Tests/XMTPTests/GroupTests.swift index d4e56f52..b7006464 100644 --- a/Tests/XMTPTests/GroupTests.swift +++ b/Tests/XMTPTests/GroupTests.swift @@ -394,6 +394,7 @@ class GroupTests: XCTestCase { let fixtures = try await localFixtures() let expectation1 = expectation(description: "got a conversation") + expectation1.expectedFulfillmentCount = 2 Task(priority: .userInitiated) { for try await _ in try await fixtures.aliceClient.conversations.streamAll() { @@ -402,23 +403,82 @@ class GroupTests: XCTestCase { } _ = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) - // _ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) + _ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) await waitForExpectations(timeout: 3) } - func testCanStreamGroupsAndConversationsWorksConvos() async throws { + func testCanStreamAllMessages() async throws { let fixtures = try await localFixtures() let expectation1 = expectation(description: "got a conversation") + expectation1.expectedFulfillmentCount = 2 + let convo = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) + let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + try await fixtures.aliceClient.conversations.sync() + Task(priority: .userInitiated) { + for try await _ in try await fixtures.aliceClient.conversations.streamAllMessages(includeGroups: true) { + expectation1.fulfill() + } + } + + try await group.send(content: "hi") + try await convo.send(content: "hi") + + await waitForExpectations(timeout: 3) + } + + func testCanStreamAllDecryptedMessages() async throws { + let fixtures = try await localFixtures() + let expectation1 = expectation(description: "got a conversation") + expectation1.expectedFulfillmentCount = 2 + let convo = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) + let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + try await fixtures.aliceClient.conversations.sync() Task(priority: .userInitiated) { - for try await _ in try await fixtures.aliceClient.conversations.streamAll() { + for try await _ in try await fixtures.aliceClient.conversations.streamAllDecryptedMessages(includeGroups: true) { expectation1.fulfill() } } - _ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address) + try await group.send(content: "hi") + try await convo.send(content: "hi") + + await waitForExpectations(timeout: 3) + } + + func testCanStreamAllGroupMessages() async throws { + let fixtures = try await localFixtures() + + let expectation1 = expectation(description: "got a conversation") + + let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + try await fixtures.aliceClient.conversations.sync() + Task(priority: .userInitiated) { + for try await _ in try await fixtures.aliceClient.conversations.streamAllGroupMessages() { + expectation1.fulfill() + } + } + + try await group.send(content: "hi") + + await waitForExpectations(timeout: 3) + } + + func testCanStreamAllGroupDecryptedMessages() async throws { + let fixtures = try await localFixtures() + + let expectation1 = expectation(description: "got a conversation") + let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address]) + try await fixtures.aliceClient.conversations.sync() + Task(priority: .userInitiated) { + for try await _ in try await fixtures.aliceClient.conversations.streamAllGroupDecryptedMessages() { + expectation1.fulfill() + } + } + + try await group.send(content: "hi") await waitForExpectations(timeout: 3) }