Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sendability warnings #417

Merged
merged 13 commits into from
Nov 1, 2023
3 changes: 1 addition & 2 deletions Sources/NIOHPACK/HPACKErrors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ public enum NIOHPACKErrors {
/// A dynamic table size update was found outside its allowed place.
/// They may only be included at the start of a header block.
public struct IllegalDynamicTableSizeChange : NIOHPACKError {}

/// A new header could not be added to the dynamic table. Usually
/// this means the header itself is larger than the current
/// dynamic table size.
public struct FailedToAddIndexedHeader<Name: Collection, Value: Collection> : NIOHPACKError where Name.Element == UInt8, Value.Element == UInt8 {
public struct FailedToAddIndexedHeader<Name: Collection & Sendable, Value: Collection & Sendable> : NIOHPACKError where Name.Element == UInt8, Value.Element == UInt8 {
/// The table size required to be able to add this header to the table.
public let bytesNeeded: Int

Expand Down
61 changes: 47 additions & 14 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,16 @@ extension NIOHTTP2Handler {
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
/// and `IOData`.
public struct StreamMultiplexer: @unchecked Sendable {
// '@unchecked Sendable' because this state is not intrinsically `Sendable`
// but it is only accessed in `createStreamChannel` which executes the work on the right event loop
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public struct StreamMultiplexer: Sendable {

private let inlineStreamMultiplexer: InlineStreamMultiplexer.SendableView

private let eventLoop: EventLoop

/// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, eventLoop: EventLoop) {
self.inlineStreamMultiplexer = InlineStreamMultiplexer.SendableView(inlineStreamMultiplexer)
self.eventLoop = eventLoop
}

/// Create a new `Channel` for a new stream initiated by this peer.
Expand All @@ -187,7 +189,13 @@ extension NIOHTTP2Handler {
/// - streamStateInitializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
public func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping StreamInitializer) {
self.inlineStreamMultiplexer.createStreamChannel(promise: promise, streamStateInitializer)
if self.eventLoop.inEventLoop {
self.inlineStreamMultiplexer.createStreamChannel(promise: promise, streamStateInitializer)
} else {
self.eventLoop.execute {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on the SendableView.

self.inlineStreamMultiplexer.createStreamChannel(promise: promise, streamStateInitializer)
}
}
}

/// Create a new `Channel` for a new stream initiated by this peer.
Expand All @@ -201,7 +209,9 @@ extension NIOHTTP2Handler {
/// `ChannelPipeline` for the newly created channel.
/// - Returns: An `EventLoopFuture` containing the created `Channel`, fulfilled after the supplied `streamStateInitializer` has been executed on it.
public func createStreamChannel(_ streamStateInitializer: @escaping StreamInitializer) -> EventLoopFuture<Channel> {
self.inlineStreamMultiplexer.createStreamChannel(streamStateInitializer)
let promise = self.eventLoop.makePromise(of: Channel.self)
self.createStreamChannel(promise: promise, streamStateInitializer)
return promise.futureResult
}
}
}
Expand All @@ -227,20 +237,43 @@ extension NIOHTTP2Handler {
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public struct AsyncStreamMultiplexer<InboundStreamOutput: Sendable>: Sendable {
private let inlineStreamMultiplexer: NIOLoopBound<InlineStreamMultiplexer>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't use the sendable view here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, an oversight. I have used it here and fleshed out its API a bit more to mirror the underlying multiplexer.

private let eventLoop: EventLoop
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, eventLoop: EventLoop, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = NIOLoopBound(inlineStreamMultiplexer, eventLoop: eventLoop)
self.eventLoop = eventLoop
self.inlineStreamMultiplexer.value.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
if self.eventLoop.inEventLoop {
return try await self.inlineStreamMultiplexer.value.createStreamChannel(initializer).get()
} else {
return try await self.eventLoop.flatSubmit {
self.inlineStreamMultiplexer.value.createStreamChannel(initializer)
}.get()
}
}
}
}

extension InlineStreamMultiplexer {
/// InlineStreamMultiplexerSendableView exposes only the thread-safe API of InlineStreamMultiplexer
struct SendableView: @unchecked Sendable {
private let inlineStreamMultiplexer: InlineStreamMultiplexer

init(_ inlineStreamMultiplexer: InlineStreamMultiplexer) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
}

internal func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializer) {
self.inlineStreamMultiplexer.createStreamChannel(promise: promise, streamStateInitializer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused: is this method thread-safe, or is the usage point protecting it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot. It's intended that the method is thread-safe. I've moved the event loop dance inside this method to make that the case.

}
}
}
52 changes: 39 additions & 13 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
}

/// The mode for this parser to operate in: client or server.
public enum ParserMode {
public enum ParserMode: Sendable {
/// Client mode
case client

Expand Down Expand Up @@ -851,9 +851,9 @@ extension NIOHTTP2Handler {
// Don't allow infinite recursion through this method.
return
}

self.isUnbufferingAndFlushingAutomaticFrames = true

loop: while true {
switch self.outboundBuffer.nextFlushedWritableFrame(channelWritable: self.channelWritable) {
case .noFrame:
Expand All @@ -864,7 +864,7 @@ extension NIOHTTP2Handler {
self.processOutboundFrame(context: context, frame: frame, promise: promise)
}
}

self.isUnbufferingAndFlushingAutomaticFrames = false
self.flushIfNecessary(context: context)
}
Expand Down Expand Up @@ -1119,8 +1119,9 @@ extension NIOHTTP2Handler {
return try self.syncMultiplexer()
}
} else {
let unsafeSelf = UnsafeTransfer(self)
return self.eventLoop!.submit {
return try self.syncMultiplexer()
return try unsafeSelf.wrappedValue.syncMultiplexer()
}
}
}
Expand All @@ -1132,24 +1133,49 @@ extension NIOHTTP2Handler {
/// > - The caller is already on the correct event loop.
public func syncMultiplexer() throws -> StreamMultiplexer {
self.eventLoop!.preconditionInEventLoop()
guard let inboundStreamMultiplexer = self.inboundStreamMultiplexer else {
throw NIOHTTP2Errors.missingMultiplexer()
}

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return StreamMultiplexer(multiplexer)
case .some(.legacy), .none:
switch inboundStreamMultiplexer {
case let .inline(multiplexer):
return StreamMultiplexer(multiplexer, eventLoop: self.eventLoop!)
case .legacy:
throw NIOHTTP2Errors.missingMultiplexer()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()
guard let inboundStreamMultiplexer = self.inboundStreamMultiplexer else {
throw NIOHTTP2Errors.missingMultiplexer()
}

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
switch inboundStreamMultiplexer {
case let .inline(multiplexer):
return AsyncStreamMultiplexer(multiplexer, eventLoop: self.eventLoop!, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .legacy:
throw NIOHTTP2Errors.missingMultiplexer()
}
}
}

/// ``UnsafeTransfer`` can be used to make non-`Sendable` values `Sendable`.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler.
/// It can be used similar to `@unsafe Sendable` but for values instead of types.
@usableFromInline
struct UnsafeTransfer<Wrapped> {
@usableFromInline
var wrappedValue: Wrapped

@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}

extension UnsafeTransfer: @unchecked Sendable {}

extension UnsafeTransfer: Equatable where Wrapped: Equatable {}
extension UnsafeTransfer: Hashable where Wrapped: Hashable {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in its own file rather than smuggling it into here?

49 changes: 32 additions & 17 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ extension HTTP2CommonInboundStreamMultiplexer {
// If we have an async sequence of inbound stream channels yield the channel to it
// but only once we are sure initialization and activation succeed
if let streamChannelContinuation = self.streamChannelContinuation {
let promise = self.channel.eventLoop.makePromise(of: Any.self)
let promise = self.channel.eventLoop.makePromise(of: (any Sendable).self)
promise.futureResult.whenSuccess { value in
streamChannelContinuation.yield(any: value)
}
Expand Down Expand Up @@ -316,9 +316,9 @@ extension HTTP2CommonInboundStreamMultiplexer {
streamStateInitializer(channel).map { return $0 }
}

let anyPromise: EventLoopPromise<Any>?
let anyPromise: EventLoopPromise<(any Sendable)>?
if let promise = promise {
anyPromise = channel.baseChannel.eventLoop.makePromise(of: Any.self)
anyPromise = channel.baseChannel.eventLoop.makePromise(of: (any Sendable).self)
anyPromise?.futureResult.whenComplete { result in
switch result {
case .success(let any):
Expand All @@ -344,8 +344,13 @@ extension HTTP2CommonInboundStreamMultiplexer {
// Always create streams channels on the next event loop tick. This avoids re-entrancy
// issues where handlers interposed between the two HTTP/2 handlers could create streams
// in channel active which become activated twice.
//
// We are safe to use NIOLoopBounds here because the public API ensures that we are on the right event loop
// when we get to this code. Whilst it is possible that the multiplexer was created on the wrong event loop
// such an eventuality would lead us to assert immediately so we would quickly discover it.
let loopBounds = NIOLoopBound((self, multiplexer), eventLoop: self.channel.eventLoop)
self.channel.eventLoop.execute {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
loopBounds.value.0._createStreamChannel(loopBounds.value.1, promise, streamStateInitializer)
}
}

Expand Down Expand Up @@ -386,14 +391,20 @@ extension HTTP2CommonInboundStreamMultiplexer {
// Always create streams channels on the next event loop tick. This avoids re-entrancy
// issues where handlers interposed between the two HTTP/2 handlers could create streams
// in channel active which become activated twice.
//
// We are safe to use NIOLoopBounds here because the public API ensures that we are on the right event loop
// when we get to this code. Whilst it is possible that the multiplexer was created on the wrong event loop
// such an eventuality would lead us to assert immediately so we would quickly discover it.
let loopBounds = NIOLoopBound((self, multiplexer), eventLoop: self.channel.eventLoop)
self.channel.eventLoop.execute {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
loopBounds.value.0._createStreamChannel(loopBounds.value.1, promise, streamStateInitializer)
}
}

internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ streamStateInitializer: @escaping NIOChannelInitializer) -> EventLoopFuture<Channel> {
_ streamStateInitializer: @escaping NIOChannelInitializer
) -> EventLoopFuture<Channel> {
let promise = self.channel.eventLoop.makePromise(of: Channel.self)
self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer)
return promise.futureResult
Expand All @@ -403,21 +414,25 @@ extension HTTP2CommonInboundStreamMultiplexer {
internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Channel>?,
_ streamStateInitializer: @escaping (Channel, HTTP2StreamID) -> EventLoopFuture<Void>
_ streamStateInitializer: @escaping NIOChannelInitializerWithStreamID
) {
// We are safe to use NIOLoopBounds here because the public API ensures that we are on the right event loop
// when we get to this code. Whilst it is possible that the multiplexer was created on the wrong event loop
// such an eventuality would lead us to assert immediately so we would quickly discover it.
let loopBounds = NIOLoopBound((self, multiplexer), eventLoop: self.channel.eventLoop)
self.channel.eventLoop.execute {
let streamID = self.nextStreamID()
let streamID = loopBounds.value.0.nextStreamID()
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
allocator: loopBounds.value.0.channel.allocator,
parent: loopBounds.value.0.channel,
multiplexer: loopBounds.value.1,
streamID: streamID,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
targetWindowSize: Int32(loopBounds.value.0.targetWindowSize),
outboundBytesHighWatermark: loopBounds.value.0.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: loopBounds.value.0.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .includesStreamID(nil)
)
self.streams[streamID] = channel
loopBounds.value.0.streams[streamID] = channel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeatedly unpacking the loop bounds here kinda sucks: can we unwrap them at the top of the execute { } block and then use their names?

channel.configure(initializer: streamStateInitializer, userPromise: promise)
}
}
Expand Down Expand Up @@ -446,7 +461,7 @@ extension HTTP2CommonInboundStreamMultiplexer {

/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held
/// by the `HTTP2ChannelHandler` without causing it to become generic itself.
internal protocol AnyContinuation {
internal protocol AnyContinuation: Sendable {
func yield(any: Any)
func finish()
func finish(throwing error: Error)
Expand All @@ -457,7 +472,7 @@ internal protocol AnyContinuation {
/// They make use of generics to allow for wrapping the stream `Channel`s, for example as `NIOAsyncChannel`s or protocol negotiation objects.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
public struct NIOHTTP2InboundStreamChannels<Output: Sendable>: AsyncSequence {
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Output

Expand Down
Loading