Skip to content

Commit

Permalink
Resolve all outstanding conformance opt-outs (#271)
Browse files Browse the repository at this point in the history
This PR includes updates to fix all outstanding conformance test
failures and removes the corresponding opt-outs. All changes are
validated by the conformance test suite.

Many of these fixes are similar to
connectrpc/connect-kotlin#248 and
connectrpc/connect-kotlin#274.

Resolves #268.
Resolves #269.
Resolves #270.

This should also be one of the final changes before v1.0
#222.

---------

Signed-off-by: Michael Rebello <me@michaelrebello.com>
  • Loading branch information
rebello95 authored Jun 8, 2024
1 parent 12a86a1 commit 6d0f8ed
Show file tree
Hide file tree
Showing 30 changed files with 555 additions and 170 deletions.
2 changes: 1 addition & 1 deletion Examples/ElizaCocoaPodsApp/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ SPEC CHECKSUMS:

PODFILE CHECKSUM: b598f373a6ab5add976b09c2ac79029bf2200d48

COCOAPODS: 1.13.0
COCOAPODS: 1.15.2
72 changes: 59 additions & 13 deletions Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,51 @@ extension ConnectInterceptor: UnaryInterceptor {
] = current.value
})

if let encoding = response.headers[HeaderConstants.contentEncoding]?.first,
let compressionPool = self.config.responseCompressionPool(forName: encoding),
let message = response.message.flatMap({ try? compressionPool.decompress(data: $0) })
let finalResponse: HTTPResponse
let contentType = response.headers[HeaderConstants.contentType]?.first ?? ""
if response.code == .ok && !contentType.hasPrefix("application/\(self.config.codec.name())")
{
proceed(HTTPResponse(
code: response.code,
headers: headers,
message: message,
trailers: trailers,
error: response.error,
// If content-type looks like it could be an RPC server's response, consider
// this an internal error.
let code: Code = contentType.hasPrefix("application/") ? .internalError : .unknown
finalResponse = HTTPResponse(
code: code, headers: headers, message: nil, trailers: trailers,
error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"),
tracingInfo: response.tracingInfo
))
)
} else if let encoding = response.headers[HeaderConstants.contentEncoding]?.first {
if let compressionPool = self.config.responseCompressionPool(forName: encoding),
let message = response.message.flatMap({ try? compressionPool.decompress(data: $0) })
{
finalResponse = HTTPResponse(
code: response.code,
headers: headers,
message: message,
trailers: trailers,
error: response.error,
tracingInfo: response.tracingInfo
)
} else {
finalResponse = HTTPResponse(
code: .internalError,
headers: headers,
message: nil,
trailers: trailers,
error: ConnectError(code: .internalError, message: "unexpected encoding"),
tracingInfo: response.tracingInfo
)
}
} else {
proceed(HTTPResponse(
finalResponse = HTTPResponse(
code: response.code,
headers: headers,
message: response.message,
trailers: trailers,
error: response.error,
tracingInfo: response.tracingInfo
))
)
}
proceed(finalResponse)
}
}

Expand Down Expand Up @@ -146,13 +169,36 @@ extension ConnectInterceptor: StreamInterceptor {
switch result {
case .headers(let headers):
self.streamResponseHeaders.value = headers
proceed(result)

let contentType = headers[HeaderConstants.contentType]?.first ?? ""
if contentType != "application/connect+\(self.config.codec.name())" {
// If content-type looks like it could be an RPC server's response, consider
// this an internal error.
let code: Code = contentType.hasPrefix("application/connect+")
? .internalError
: .unknown
proceed(.complete(
code: code, error: ConnectError(
code: code, message: "unexpected content-type: \(contentType)"
), trailers: nil
))
} else {
proceed(result)
}
case .message(let data):
do {
let responseCompressionPool = self.streamResponseHeaders.value?[
HeaderConstants.connectStreamingContentEncoding
]?.first.flatMap { self.config.responseCompressionPool(forName: $0) }
if responseCompressionPool == nil && Envelope.isCompressed(data) {
proceed(.complete(
code: .internalError, error: ConnectError(
code: .internalError, message: "received unexpected compressed message"
), trailers: nil
))
return
}

let (headerByte, message) = try Envelope.unpackMessage(
data, compressionPool: responseCompressionPool
)
Expand Down
99 changes: 86 additions & 13 deletions Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,39 @@ extension GRPCWebInterceptor: UnaryInterceptor {
response.headers,
trailers: response.trailers
)
if grpcCode != .ok || connectError != nil {
proceed(HTTPResponse(
// Rewrite the gRPC code if it is "ok" but `connectError` is non-nil.
code: grpcCode == .ok ? .unknown : grpcCode,
headers: response.headers,
message: response.message,
trailers: response.trailers,
error: connectError,
tracingInfo: response.tracingInfo
))
} else {
proceed(HTTPResponse(
code: .unimplemented,
headers: response.headers,
message: response.message,
trailers: response.trailers,
error: ConnectError(
code: .unimplemented, message: "unary response has no message"
),
tracingInfo: response.tracingInfo
))
}
return
}

let contentType = response.headers[HeaderConstants.contentType]?.first ?? ""
if response.code == .ok && !self.contentTypeIsExpectedGRPCWeb(contentType) {
// If content-type looks like it could be a gRPC server's response, consider
// this an internal error.
let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown
proceed(HTTPResponse(
code: grpcCode,
headers: response.headers,
message: response.message,
trailers: response.trailers,
error: connectError,
code: code, headers: response.headers, message: nil, trailers: response.trailers,
error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"),
tracingInfo: response.tracingInfo
))
return
Expand All @@ -75,6 +102,18 @@ extension GRPCWebInterceptor: UnaryInterceptor {
let compressionPool = response.headers[HeaderConstants.grpcContentEncoding]?
.first
.flatMap { self.config.responseCompressionPool(forName: $0) }
if compressionPool == nil && Envelope.isCompressed(responseData) {
proceed(HTTPResponse(
code: .internalError, headers: response.headers, message: nil,
trailers: response.trailers,
error: ConnectError(
code: .internalError, message: "received unexpected compressed message"
),
tracingInfo: response.tracingInfo
))
return
}

do {
// gRPC Web returns data in 2 chunks (either/both of which may be compressed):
// 1. OPTIONAL (when not trailers-only): The (headers and length prefixed)
Expand Down Expand Up @@ -107,7 +146,7 @@ extension GRPCWebInterceptor: UnaryInterceptor {
}
} catch let error {
proceed(HTTPResponse(
code: .unknown,
code: .unimplemented,
headers: response.headers,
message: response.message,
trailers: response.trailers,
Expand Down Expand Up @@ -146,6 +185,19 @@ extension GRPCWebInterceptor: StreamInterceptor {
) {
switch result {
case .headers(let headers):
let contentType = headers[HeaderConstants.contentType]?.first ?? ""
if !self.contentTypeIsExpectedGRPCWeb(contentType) {
// If content-type looks like it could be a gRPC server's response, consider
// this an internal error.
let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown
proceed(.complete(
code: code, error: ConnectError(
code: code, message: "unexpected content-type: \(contentType)"
), trailers: headers
))
return
}

if let grpcCode = headers.grpcStatus() {
// Headers-only response.
proceed(.complete(
Expand Down Expand Up @@ -193,9 +245,20 @@ extension GRPCWebInterceptor: StreamInterceptor {
proceed(result)
}
}
}

// MARK: - Private
// MARK: - Private

private func contentTypeIsGRPCWeb(_ contentType: String) -> Bool {
return contentType == "application/grpc-web"
|| contentType.hasPrefix("application/grpc-web+")
}

private func contentTypeIsExpectedGRPCWeb(_ contentType: String) -> Bool {
let codecName = self.config.codec.name()
return (codecName == "proto" && contentType == "application/grpc-web")
|| contentType == "application/grpc-web+\(codecName)"
}
}

private struct TrailersDecodingError: Error {}

Expand Down Expand Up @@ -228,13 +291,23 @@ private extension Trailers {
private extension HTTPResponse {
func withHandledGRPCWebTrailers(_ trailers: Trailers, message: Data?) -> Self {
let (grpcCode, error) = ConnectError.parseGRPCHeaders(self.headers, trailers: trailers)
if grpcCode == .ok {
if grpcCode != .ok || error != nil {
return HTTPResponse(
code: grpcCode,
// Rewrite the gRPC code if it is "ok" but `connectError` is non-nil.
code: grpcCode == .ok ? .unknown : grpcCode,
headers: self.headers,
message: message,
message: nil,
trailers: trailers,
error: nil,
error: error,
tracingInfo: self.tracingInfo
)
} else if message?.isEmpty != false {
return HTTPResponse(
code: .unimplemented,
headers: self.headers,
message: nil,
trailers: trailers,
error: ConnectError(code: .unimplemented, message: "unary response has no message"),
tracingInfo: self.tracingInfo
)
} else {
Expand All @@ -243,7 +316,7 @@ private extension HTTPResponse {
headers: self.headers,
message: message,
trailers: trailers,
error: error,
error: nil,
tracingInfo: self.tracingInfo
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

import SwiftProtobuf

/// Concrete implementation of `BidirectionalAsyncStreamInterface`.
/// Concrete **internal** implementation of `BidirectionalAsyncStreamInterface`.
/// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream`
/// to work with async/await.
///
/// If the library removes callback support in favor of only supporting async/await in the future,
/// this class can be simplified.
@available(iOS 13, *)
final class BidirectionalAsyncStream<
class BidirectionalAsyncStream<
Input: ProtobufMessage, Output: ProtobufMessage
>: @unchecked Sendable {
/// The underlying async stream that will be exposed to the consumer.
Expand Down Expand Up @@ -71,10 +74,10 @@ final class BidirectionalAsyncStream<
}

/// Send a result to the consumer over the `results()` `AsyncStream`.
/// Should be called by the protocol client when a result is received.
/// Should be called by the protocol client when a result is received from the network.
///
/// - parameter result: The new result that was received.
func receive(_ result: StreamResult<Output>) {
func handleResultFromServer(_ result: StreamResult<Output>) {
self.receiveResult(result)
}
}
Expand Down Expand Up @@ -103,7 +106,3 @@ extension BidirectionalAsyncStream: BidirectionalAsyncStreamInterface {
self.requestCallbacks?.cancel()
}
}

// Conforms to the client-only interface since it matches exactly and the implementation is internal
@available(iOS 13, *)
extension BidirectionalAsyncStream: ClientOnlyAsyncStreamInterface {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import SwiftProtobuf

/// Concrete implementation of `BidirectionalStreamInterface`.
/// Concrete **internal** implementation of `BidirectionalStreamInterface`.
final class BidirectionalStream<Message: ProtobufMessage>: Sendable {
private let requestCallbacks: RequestCallbacks<Message>

Expand All @@ -40,6 +40,3 @@ extension BidirectionalStream: BidirectionalStreamInterface {
self.requestCallbacks.cancel()
}
}

// Conforms to the client-only interface since it matches exactly and the implementation is internal
extension BidirectionalStream: ClientOnlyStreamInterface {}
50 changes: 50 additions & 0 deletions Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022-2024 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation

/// Concrete **internal** implementation of `ClientOnlyAsyncStreamInterface`.
/// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream`
/// to work with async/await.
///
/// This subclasses `BidirectionalAsyncStream` since its behavior is purely additive (it overlays
/// some additional validation) and both types are internal to the package, not public.
@available(iOS 13, *)
final class ClientOnlyAsyncStream<
Input: ProtobufMessage, Output: ProtobufMessage
>: BidirectionalAsyncStream<Input, Output> {
private let receivedResults = Locked([StreamResult<Output>]())

override func handleResultFromServer(_ result: StreamResult<Output>) {
let (isComplete, results) = self.receivedResults.perform { results in
results.append(result)
if case .complete = result {
return (true, ClientOnlyStreamValidation.validatedFinalClientStreamResults(results))
} else {
return (false, [])
}
}
guard isComplete else {
return
}
results.forEach(super.handleResultFromServer)
}
}

@available(iOS 13, *)
extension ClientOnlyAsyncStream: ClientOnlyAsyncStreamInterface {
func closeAndReceive() {
self.close()
}
}
Loading

0 comments on commit 6d0f8ed

Please sign in to comment.