-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sendability warnings #417
Sendability warnings #417
Changes from 3 commits
823875d
2df3fd3
b12bff7
3616f0e
939759e
ceba056
3b861c5
6a08241
fe57f5e
7d47eb1
0482873
6f79b31
95bae6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -164,14 +164,16 @@ extension NIOHTTP2Handler { | |
/// on ``HTTP2Frame/FramePayload`` objects as their base communication | ||
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer` | ||
/// and `IOData`. | ||
public struct StreamMultiplexer: @unchecked Sendable { | ||
// '@unchecked Sendable' because this state is not intrinsically `Sendable` | ||
// but it is only accessed in `createStreamChannel` which executes the work on the right event loop | ||
private let inlineStreamMultiplexer: InlineStreamMultiplexer | ||
public struct StreamMultiplexer: Sendable { | ||
|
||
private let inlineStreamMultiplexer: InlineStreamMultiplexer.SendableView | ||
|
||
private let eventLoop: EventLoop | ||
|
||
/// Cannot be created by users. | ||
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer) { | ||
self.inlineStreamMultiplexer = inlineStreamMultiplexer | ||
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, eventLoop: EventLoop) { | ||
self.inlineStreamMultiplexer = InlineStreamMultiplexer.SendableView(inlineStreamMultiplexer) | ||
self.eventLoop = eventLoop | ||
} | ||
|
||
/// Create a new `Channel` for a new stream initiated by this peer. | ||
|
@@ -187,7 +189,13 @@ extension NIOHTTP2Handler { | |
/// - streamStateInitializer: A callback that will be invoked to allow you to configure the | ||
/// `ChannelPipeline` for the newly created channel. | ||
public func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping StreamInitializer) { | ||
self.inlineStreamMultiplexer.createStreamChannel(promise: promise, streamStateInitializer) | ||
if self.eventLoop.inEventLoop { | ||
self.inlineStreamMultiplexer.createStreamChannel(promise: promise, streamStateInitializer) | ||
} else { | ||
self.eventLoop.execute { | ||
self.inlineStreamMultiplexer.createStreamChannel(promise: promise, streamStateInitializer) | ||
} | ||
} | ||
} | ||
|
||
/// Create a new `Channel` for a new stream initiated by this peer. | ||
|
@@ -201,7 +209,9 @@ extension NIOHTTP2Handler { | |
/// `ChannelPipeline` for the newly created channel. | ||
/// - Returns: An `EventLoopFuture` containing the created `Channel`, fulfilled after the supplied `streamStateInitializer` has been executed on it. | ||
public func createStreamChannel(_ streamStateInitializer: @escaping StreamInitializer) -> EventLoopFuture<Channel> { | ||
self.inlineStreamMultiplexer.createStreamChannel(streamStateInitializer) | ||
let promise = self.eventLoop.makePromise(of: Channel.self) | ||
self.createStreamChannel(promise: promise, streamStateInitializer) | ||
return promise.futureResult | ||
} | ||
} | ||
} | ||
|
@@ -227,20 +237,43 @@ extension NIOHTTP2Handler { | |
/// `Output`. This type may be `HTTP2Frame` or changed to any other type. | ||
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) | ||
@_spi(AsyncChannel) | ||
public struct AsyncStreamMultiplexer<InboundStreamOutput> { | ||
private let inlineStreamMultiplexer: InlineStreamMultiplexer | ||
public struct AsyncStreamMultiplexer<InboundStreamOutput: Sendable>: Sendable { | ||
private let inlineStreamMultiplexer: NIOLoopBound<InlineStreamMultiplexer> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason we don't use the sendable view here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so, an oversight. I have used it here and fleshed out its API a bit more to mirror the underlying multiplexer. |
||
private let eventLoop: EventLoop | ||
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput> | ||
|
||
// Cannot be created by users. | ||
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) { | ||
self.inlineStreamMultiplexer = inlineStreamMultiplexer | ||
self.inlineStreamMultiplexer.setChannelContinuation(continuation) | ||
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, eventLoop: EventLoop, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) { | ||
self.inlineStreamMultiplexer = NIOLoopBound(inlineStreamMultiplexer, eventLoop: eventLoop) | ||
self.eventLoop = eventLoop | ||
self.inlineStreamMultiplexer.value.setChannelContinuation(continuation) | ||
self.inbound = inboundStreamChannels | ||
} | ||
|
||
/// Create a stream channel initialized with the provided closure | ||
public func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output { | ||
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get() | ||
if self.eventLoop.inEventLoop { | ||
return try await self.inlineStreamMultiplexer.value.createStreamChannel(initializer).get() | ||
} else { | ||
return try await self.eventLoop.flatSubmit { | ||
self.inlineStreamMultiplexer.value.createStreamChannel(initializer) | ||
}.get() | ||
} | ||
} | ||
} | ||
} | ||
|
||
extension InlineStreamMultiplexer { | ||
/// InlineStreamMultiplexerSendableView exposes only the thread-safe API of InlineStreamMultiplexer | ||
struct SendableView: @unchecked Sendable { | ||
private let inlineStreamMultiplexer: InlineStreamMultiplexer | ||
|
||
init(_ inlineStreamMultiplexer: InlineStreamMultiplexer) { | ||
self.inlineStreamMultiplexer = inlineStreamMultiplexer | ||
} | ||
|
||
internal func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializer) { | ||
self.inlineStreamMultiplexer.createStreamChannel(promise: promise, streamStateInitializer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a bit confused: is this method thread-safe, or is the usage point protecting it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good spot. It's intended that the method is thread-safe. I've moved the event loop dance inside this method to make that the case. |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -175,7 +175,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { | |
} | ||
|
||
/// The mode for this parser to operate in: client or server. | ||
public enum ParserMode { | ||
public enum ParserMode: Sendable { | ||
/// Client mode | ||
case client | ||
|
||
|
@@ -851,9 +851,9 @@ extension NIOHTTP2Handler { | |
// Don't allow infinite recursion through this method. | ||
return | ||
} | ||
|
||
self.isUnbufferingAndFlushingAutomaticFrames = true | ||
|
||
loop: while true { | ||
switch self.outboundBuffer.nextFlushedWritableFrame(channelWritable: self.channelWritable) { | ||
case .noFrame: | ||
|
@@ -864,7 +864,7 @@ extension NIOHTTP2Handler { | |
self.processOutboundFrame(context: context, frame: frame, promise: promise) | ||
} | ||
} | ||
|
||
self.isUnbufferingAndFlushingAutomaticFrames = false | ||
self.flushIfNecessary(context: context) | ||
} | ||
|
@@ -1119,8 +1119,9 @@ extension NIOHTTP2Handler { | |
return try self.syncMultiplexer() | ||
} | ||
} else { | ||
let unsafeSelf = UnsafeTransfer(self) | ||
return self.eventLoop!.submit { | ||
return try self.syncMultiplexer() | ||
return try unsafeSelf.wrappedValue.syncMultiplexer() | ||
} | ||
} | ||
} | ||
|
@@ -1132,24 +1133,49 @@ extension NIOHTTP2Handler { | |
/// > - The caller is already on the correct event loop. | ||
public func syncMultiplexer() throws -> StreamMultiplexer { | ||
self.eventLoop!.preconditionInEventLoop() | ||
guard let inboundStreamMultiplexer = self.inboundStreamMultiplexer else { | ||
throw NIOHTTP2Errors.missingMultiplexer() | ||
} | ||
|
||
switch self.inboundStreamMultiplexer { | ||
case let .some(.inline(multiplexer)): | ||
return StreamMultiplexer(multiplexer) | ||
case .some(.legacy), .none: | ||
switch inboundStreamMultiplexer { | ||
case let .inline(multiplexer): | ||
return StreamMultiplexer(multiplexer, eventLoop: self.eventLoop!) | ||
case .legacy: | ||
throw NIOHTTP2Errors.missingMultiplexer() | ||
} | ||
} | ||
|
||
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) | ||
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> { | ||
self.eventLoop!.preconditionInEventLoop() | ||
guard let inboundStreamMultiplexer = self.inboundStreamMultiplexer else { | ||
throw NIOHTTP2Errors.missingMultiplexer() | ||
} | ||
|
||
switch self.inboundStreamMultiplexer { | ||
case let .some(.inline(multiplexer)): | ||
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels) | ||
case .some(.legacy), .none: | ||
switch inboundStreamMultiplexer { | ||
case let .inline(multiplexer): | ||
return AsyncStreamMultiplexer(multiplexer, eventLoop: self.eventLoop!, continuation: continuation, inboundStreamChannels: inboundStreamChannels) | ||
case .legacy: | ||
throw NIOHTTP2Errors.missingMultiplexer() | ||
} | ||
} | ||
} | ||
|
||
/// ``UnsafeTransfer`` can be used to make non-`Sendable` values `Sendable`. | ||
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler. | ||
/// It can be used similar to `@unsafe Sendable` but for values instead of types. | ||
@usableFromInline | ||
struct UnsafeTransfer<Wrapped> { | ||
@usableFromInline | ||
var wrappedValue: Wrapped | ||
|
||
@inlinable | ||
init(_ wrappedValue: Wrapped) { | ||
self.wrappedValue = wrappedValue | ||
} | ||
} | ||
|
||
extension UnsafeTransfer: @unchecked Sendable {} | ||
|
||
extension UnsafeTransfer: Equatable where Wrapped: Equatable {} | ||
extension UnsafeTransfer: Hashable where Wrapped: Hashable {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we put this in its own file rather than smuggling it into here? |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -112,7 +112,7 @@ extension HTTP2CommonInboundStreamMultiplexer { | |
// If we have an async sequence of inbound stream channels yield the channel to it | ||
// but only once we are sure initialization and activation succeed | ||
if let streamChannelContinuation = self.streamChannelContinuation { | ||
let promise = self.channel.eventLoop.makePromise(of: Any.self) | ||
let promise = self.channel.eventLoop.makePromise(of: (any Sendable).self) | ||
promise.futureResult.whenSuccess { value in | ||
streamChannelContinuation.yield(any: value) | ||
} | ||
|
@@ -316,9 +316,9 @@ extension HTTP2CommonInboundStreamMultiplexer { | |
streamStateInitializer(channel).map { return $0 } | ||
} | ||
|
||
let anyPromise: EventLoopPromise<Any>? | ||
let anyPromise: EventLoopPromise<(any Sendable)>? | ||
if let promise = promise { | ||
anyPromise = channel.baseChannel.eventLoop.makePromise(of: Any.self) | ||
anyPromise = channel.baseChannel.eventLoop.makePromise(of: (any Sendable).self) | ||
anyPromise?.futureResult.whenComplete { result in | ||
switch result { | ||
case .success(let any): | ||
|
@@ -344,8 +344,13 @@ extension HTTP2CommonInboundStreamMultiplexer { | |
// Always create streams channels on the next event loop tick. This avoids re-entrancy | ||
// issues where handlers interposed between the two HTTP/2 handlers could create streams | ||
// in channel active which become activated twice. | ||
// | ||
// We are safe to use NIOLoopBounds here because the public API ensures that we are on the right event loop | ||
// when we get to this code. Whilst it is possible that the multiplexer was created on the wrong event loop | ||
// such an eventuality would lead us to assert immediately so we would quickly discover it. | ||
let loopBounds = NIOLoopBound((self, multiplexer), eventLoop: self.channel.eventLoop) | ||
self.channel.eventLoop.execute { | ||
self._createStreamChannel(multiplexer, promise, streamStateInitializer) | ||
loopBounds.value.0._createStreamChannel(loopBounds.value.1, promise, streamStateInitializer) | ||
} | ||
} | ||
|
||
|
@@ -386,14 +391,20 @@ extension HTTP2CommonInboundStreamMultiplexer { | |
// Always create streams channels on the next event loop tick. This avoids re-entrancy | ||
// issues where handlers interposed between the two HTTP/2 handlers could create streams | ||
// in channel active which become activated twice. | ||
// | ||
// We are safe to use NIOLoopBounds here because the public API ensures that we are on the right event loop | ||
// when we get to this code. Whilst it is possible that the multiplexer was created on the wrong event loop | ||
// such an eventuality would lead us to assert immediately so we would quickly discover it. | ||
let loopBounds = NIOLoopBound((self, multiplexer), eventLoop: self.channel.eventLoop) | ||
self.channel.eventLoop.execute { | ||
self._createStreamChannel(multiplexer, promise, streamStateInitializer) | ||
loopBounds.value.0._createStreamChannel(loopBounds.value.1, promise, streamStateInitializer) | ||
} | ||
} | ||
|
||
internal func createStreamChannel( | ||
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, | ||
_ streamStateInitializer: @escaping NIOChannelInitializer) -> EventLoopFuture<Channel> { | ||
_ streamStateInitializer: @escaping NIOChannelInitializer | ||
) -> EventLoopFuture<Channel> { | ||
let promise = self.channel.eventLoop.makePromise(of: Channel.self) | ||
self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer) | ||
return promise.futureResult | ||
|
@@ -403,21 +414,25 @@ extension HTTP2CommonInboundStreamMultiplexer { | |
internal func createStreamChannel( | ||
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, | ||
promise: EventLoopPromise<Channel>?, | ||
_ streamStateInitializer: @escaping (Channel, HTTP2StreamID) -> EventLoopFuture<Void> | ||
_ streamStateInitializer: @escaping NIOChannelInitializerWithStreamID | ||
) { | ||
// We are safe to use NIOLoopBounds here because the public API ensures that we are on the right event loop | ||
// when we get to this code. Whilst it is possible that the multiplexer was created on the wrong event loop | ||
// such an eventuality would lead us to assert immediately so we would quickly discover it. | ||
let loopBounds = NIOLoopBound((self, multiplexer), eventLoop: self.channel.eventLoop) | ||
self.channel.eventLoop.execute { | ||
let streamID = self.nextStreamID() | ||
let streamID = loopBounds.value.0.nextStreamID() | ||
let channel = MultiplexerAbstractChannel( | ||
allocator: self.channel.allocator, | ||
parent: self.channel, | ||
multiplexer: multiplexer, | ||
allocator: loopBounds.value.0.channel.allocator, | ||
parent: loopBounds.value.0.channel, | ||
multiplexer: loopBounds.value.1, | ||
streamID: streamID, | ||
targetWindowSize: Int32(self.targetWindowSize), | ||
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark, | ||
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark, | ||
targetWindowSize: Int32(loopBounds.value.0.targetWindowSize), | ||
outboundBytesHighWatermark: loopBounds.value.0.streamChannelOutboundBytesHighWatermark, | ||
outboundBytesLowWatermark: loopBounds.value.0.streamChannelOutboundBytesLowWatermark, | ||
inboundStreamStateInitializer: .includesStreamID(nil) | ||
) | ||
self.streams[streamID] = channel | ||
loopBounds.value.0.streams[streamID] = channel | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Repeatedly unpacking the loop bounds here kinda sucks: can we unwrap them at the top of the |
||
channel.configure(initializer: streamStateInitializer, userPromise: promise) | ||
} | ||
} | ||
|
@@ -446,7 +461,7 @@ extension HTTP2CommonInboundStreamMultiplexer { | |
|
||
/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held | ||
/// by the `HTTP2ChannelHandler` without causing it to become generic itself. | ||
internal protocol AnyContinuation { | ||
internal protocol AnyContinuation: Sendable { | ||
func yield(any: Any) | ||
func finish() | ||
func finish(throwing error: Error) | ||
|
@@ -457,7 +472,7 @@ internal protocol AnyContinuation { | |
/// They make use of generics to allow for wrapping the stream `Channel`s, for example as `NIOAsyncChannel`s or protocol negotiation objects. | ||
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) | ||
@_spi(AsyncChannel) | ||
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence { | ||
public struct NIOHTTP2InboundStreamChannels<Output: Sendable>: AsyncSequence { | ||
public struct AsyncIterator: AsyncIteratorProtocol { | ||
public typealias Element = Output | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment on the
SendableView
.