diff --git a/Tests/ConnectLibraryTests/ConnectConformance/AsyncAwaitConformance.swift b/Tests/ConnectLibraryTests/ConnectConformance/AsyncAwaitConformance.swift index 87b74242..1e268f3f 100644 --- a/Tests/ConnectLibraryTests/ConnectConformance/AsyncAwaitConformance.swift +++ b/Tests/ConnectLibraryTests/ConnectConformance/AsyncAwaitConformance.swift @@ -77,6 +77,37 @@ final class AsyncAwaitConformance: XCTestCase { } } + func testClientStreaming() async throws { + func createPayload(bytes: Int) -> Connectrpc_Conformance_V1_StreamingInputCallRequest { + return .with { request in + request.payload = .with { $0.body = Data(Array(repeating: 1, count: bytes)) } + } + } + + try await self.executeTestWithClients { client in + let stream = client.streamingInputCall() + try stream + .send(createPayload(bytes: 250 * 1_024)) + .send(createPayload(bytes: 8)) + .send(createPayload(bytes: 1_024)) + .send(createPayload(bytes: 32 * 1_024)) + .close() + + var responseCount = 0 + for await result in stream.results() { + switch result { + case .headers, .complete: + continue + + case .message(let output): + responseCount += 1 + XCTAssertEqual(output.aggregatedPayloadSize, 289_800) + } + } + XCTAssertEqual(responseCount, 1) + } + } + func testServerStreaming() async throws { try await self.executeTestWithClients { client in let sizes = [31_415, 9, 2_653, 58_979] diff --git a/Tests/ConnectLibraryTests/ConnectConformance/CallbackConformance.swift b/Tests/ConnectLibraryTests/ConnectConformance/CallbackConformance.swift index 28de45e5..62c25313 100644 --- a/Tests/ConnectLibraryTests/ConnectConformance/CallbackConformance.swift +++ b/Tests/ConnectLibraryTests/ConnectConformance/CallbackConformance.swift @@ -83,6 +83,43 @@ final class CallbackConformance: XCTestCase { } } + func testClientStreaming() throws { + func createPayload(bytes: Int) -> Connectrpc_Conformance_V1_StreamingInputCallRequest { + return .with { request in + request.payload = .with { $0.body = Data(Array(repeating: 1, count: bytes)) } + } + } + + try self.executeTestWithClients { client in + let responseSize = Locked(0) + let expectation = self.expectation(description: "Stream completes") + let stream = client.streamingInputCall { result in + switch result { + case .headers: + break + + case .message(let output): + responseSize.value = output.aggregatedPayloadSize + + case .complete(let code, let error, _): + XCTAssertEqual(code, .ok) + XCTAssertNil(error) + expectation.fulfill() + } + } + + try stream + .send(createPayload(bytes: 250 * 1_024)) + .send(createPayload(bytes: 8)) + .send(createPayload(bytes: 1_024)) + .send(createPayload(bytes: 32 * 1_024)) + .close() + + XCTAssertEqual(XCTWaiter().wait(for: [expectation], timeout: kTimeout), .completed) + XCTAssertEqual(responseSize.value, 289_800) + } + } + func testServerStreaming() throws { try self.executeTestWithClients { client in let sizes = [31_415, 9, 2_653, 58_979]