diff --git a/Sources/NIOTransportServices/Datagram/NIOTSDatagramBootstrap.swift b/Sources/NIOTransportServices/Datagram/NIOTSDatagramBootstrap.swift new file mode 100644 index 0000000..611ad85 --- /dev/null +++ b/Sources/NIOTransportServices/Datagram/NIOTSDatagramBootstrap.swift @@ -0,0 +1,216 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2020-2023 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Network) +import NIOCore +import Dispatch +import Network + +/// A `NIOTSDatagramBootstrap` is an easy way to bootstrap a `NIOTSDatagramChannel` when creating network clients. +/// +/// Usually you re-use a `NIOTSDatagramBootstrap` once you set it up, calling `connect` multiple times on the same bootstrap. +/// This way you ensure that the same `EventLoop`s will be shared across all your connections. +/// +/// Example: +/// +/// ```swift +/// let group = NIOTSEventLoopGroup() +/// defer { +/// try! group.syncShutdownGracefully() +/// } +/// let bootstrap = NIOTSDatagramBootstrap(group: group) +/// .channelInitializer { channel in +/// channel.pipeline.addHandler(MyChannelHandler()) +/// } +/// try! bootstrap.connect(host: "example.org", port: 12345).wait() +/// /* the Channel is now connected */ +/// ``` +/// +/// The connected `NIOTSDatagramChannel` will operate on `ByteBuffer` as inbound and outbound messages. +@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) +public final class NIOTSDatagramBootstrap { + private let group: EventLoopGroup + private var channelInitializer: ((Channel) -> EventLoopFuture)? + private var connectTimeout: TimeAmount = TimeAmount.seconds(10) + private var channelOptions = ChannelOptions.Storage() + private var qos: DispatchQoS? + private var udpOptions: NWProtocolUDP.Options = .init() + private var tlsOptions: NWProtocolTLS.Options? + + /// Create a `NIOTSDatagramConnectionBootstrap` on the `EventLoopGroup` `group`. + /// + /// This initializer only exists to be more in-line with the NIO core bootstraps, in that they + /// may be constructed with an `EventLoopGroup` and by extension an `EventLoop`. As such an + /// existing `NIOTSEventLoop` may be used to initialize this bootstrap. Where possible the + /// initializers accepting `NIOTSEventLoopGroup` should be used instead to avoid the wrong + /// type being used. + /// + /// Note that the "real" solution is described in https://github.com/apple/swift-nio/issues/674. + /// + /// - parameters: + /// - group: The `EventLoopGroup` to use. + public init(group: EventLoopGroup) { + self.group = group + } + + /// Create a `NIOTSDatagramConnectionBootstrap` on the `NIOTSEventLoopGroup` `group`. + /// + /// - parameters: + /// - group: The `NIOTSEventLoopGroup` to use. + public convenience init(group: NIOTSEventLoopGroup) { + self.init(group: group as EventLoopGroup) + } + + /// Initialize the connected `NIOTSDatagramConnectionChannel` with `initializer`. The most common task in initializer is to add + /// `ChannelHandler`s to the `ChannelPipeline`. + /// + /// The connected `Channel` will operate on `ByteBuffer` as inbound and outbound messages. + /// + /// - parameters: + /// - handler: A closure that initializes the provided `Channel`. + public func channelInitializer(_ handler: @escaping (Channel) -> EventLoopFuture) -> Self { + self.channelInitializer = handler + return self + } + + /// Specifies a `ChannelOption` to be applied to the `NIOTSDatagramConnectionChannel`. + /// + /// - parameters: + /// - option: The option to be applied. + /// - value: The value for the option. + public func channelOption(_ option: Option, value: Option.Value) -> Self { + channelOptions.append(key: option, value: value) + return self + } + + /// Specifies a timeout to apply to a connection attempt. + // + /// - parameters: + /// - timeout: The timeout that will apply to the connection attempt. + public func connectTimeout(_ timeout: TimeAmount) -> Self { + self.connectTimeout = timeout + return self + } + + /// Specifies a QoS to use for this connection, instead of the default QoS for the + /// event loop. + /// + /// This allows unusually high or low priority workloads to be appropriately scheduled. + public func withQoS(_ qos: DispatchQoS) -> Self { + self.qos = qos + return self + } + + /// Specifies the UDP options to use on the `Channel`s. + /// + /// To retrieve the UDP options from connected channels, use + /// `NIOTSChannelOptions.UDPConfiguration`. It is not possible to change the + /// UDP configuration after `connect` is called. + public func udpOptions(_ options: NWProtocolUDP.Options) -> Self { + self.udpOptions = options + return self + } + + /// Specifies the TLS options to use on the `Channel`s. + /// + /// To retrieve the TLS options from connected channels, use + /// `NIOTSChannelOptions.TLSConfiguration`. It is not possible to change the + /// TLS configuration after `connect` is called. + public func tlsOptions(_ options: NWProtocolTLS.Options) -> Self { + self.tlsOptions = options + return self + } + + /// Specify the `host` and `port` to connect to for the UDP `Channel` that will be established. + /// + /// - parameters: + /// - host: The host to connect to. + /// - port: The port to connect to. + /// - returns: An `EventLoopFuture` to deliver the `Channel` when connected. + public func connect(host: String, port: Int) -> EventLoopFuture { + guard let actualPort = NWEndpoint.Port(rawValue: UInt16(port)) else { + return self.group.next().makeFailedFuture(NIOTSErrors.InvalidPort(port: port)) + } + return self.connect(endpoint: NWEndpoint.hostPort(host: .init(host), port: actualPort)) + } + + /// Specify the `address` to connect to for the UDP `Channel` that will be established. + /// + /// - parameters: + /// - address: The address to connect to. + /// - returns: An `EventLoopFuture` to deliver the `Channel` when connected. + public func connect(to address: SocketAddress) -> EventLoopFuture { + return self.connect0 { channel, promise in + channel.bind(to: address, promise: promise) + } + } + + /// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established. + /// + /// - parameters: + /// - unixDomainSocketPath: The _Unix domain socket_ path to connect to. + /// - returns: An `EventLoopFuture` to deliver the `Channel` when connected. + public func connect(unixDomainSocketPath: String) -> EventLoopFuture { + do { + let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath) + return connect(to: address) + } catch { + return group.next().makeFailedFuture(error) + } + } + + /// Specify the `endpoint` to connect to for the UDP `Channel` that will be established. + public func connect(endpoint: NWEndpoint) -> EventLoopFuture { + return self.connect0 { channel, promise in + channel.triggerUserOutboundEvent( + NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint), + promise: promise + ) + } + } + + private func connect0(_ binder: @escaping (Channel, EventLoopPromise) -> Void) -> EventLoopFuture { + let conn: Channel = NIOTSDatagramChannel(eventLoop: self.group.next() as! NIOTSEventLoop, + qos: self.qos, + udpOptions: self.udpOptions, + tlsOptions: self.tlsOptions) + let initializer = self.channelInitializer ?? { _ in conn.eventLoop.makeSucceededFuture(()) } + let channelOptions = self.channelOptions + + return conn.eventLoop.submit { + return channelOptions.applyAllChannelOptions(to: conn).flatMap { + initializer(conn) + }.flatMap { + conn.eventLoop.assertInEventLoop() + return conn.register() + }.flatMap { + let connectPromise: EventLoopPromise = conn.eventLoop.makePromise() + binder(conn, connectPromise) + let cancelTask = conn.eventLoop.scheduleTask(in: self.connectTimeout) { + connectPromise.fail(ChannelError.connectTimeout(self.connectTimeout)) + conn.close(promise: nil) + } + + connectPromise.futureResult.whenComplete { (_: Result) in + cancelTask.cancel() + } + return connectPromise.futureResult + }.map { conn }.flatMapErrorThrowing { + conn.close(promise: nil) + throw $0 + } + }.flatMap { $0 } + } +} +#endif diff --git a/Sources/NIOTransportServices/Datagram/NIOTSDatagramChannel.swift b/Sources/NIOTransportServices/Datagram/NIOTSDatagramChannel.swift new file mode 100644 index 0000000..a6a48e5 --- /dev/null +++ b/Sources/NIOTransportServices/Datagram/NIOTSDatagramChannel.swift @@ -0,0 +1,190 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2020-2023 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Network) +import Atomics +import Foundation +import NIOCore +import NIOConcurrencyHelpers +import NIOFoundationCompat +import NIOTLS +import Dispatch +import Network +import Security + +@available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *) +internal final class NIOTSDatagramChannel: StateManagedNWConnectionChannel { + typealias ActiveSubstate = UDPSubstate + + enum UDPSubstate: NWConnectionSubstate { + case open, closed + + init() { + self = .open + } + + static func closeInput(state: inout ChannelState) throws { + throw NIOTSErrors.InvalidChannelStateTransition() + } + + static func closeOutput(state: inout ChannelState) throws { + throw NIOTSErrors.InvalidChannelStateTransition() + } + } + + /// The kinds of channel activation this channel supports + internal let supportedActivationType: ActivationType = .connect + + /// The `ByteBufferAllocator` for this `Channel`. + public let allocator = ByteBufferAllocator() + + /// An `EventLoopFuture` that will complete when this channel is finally closed. + public var closeFuture: EventLoopFuture { + return self.closePromise.futureResult + } + + /// The parent `Channel` for this one, if any. + public let parent: Channel? + + /// The `EventLoop` this `Channel` belongs to. + internal let tsEventLoop: NIOTSEventLoop + + private(set) var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads. + + internal let closePromise: EventLoopPromise + + /// The underlying `NWConnection` that this `Channel` wraps. This is only non-nil + /// after the initial connection attempt has been made. + internal var connection: NWConnection? + + /// The `DispatchQueue` that socket events for this connection will be dispatched onto. + internal let connectionQueue: DispatchQueue + + /// An `EventLoopPromise` that will be succeeded or failed when a connection attempt succeeds or fails. + internal var connectPromise: EventLoopPromise? + + /// The UDP options for this connection. + private let udpOptions: NWProtocolUDP.Options + + internal var nwOptions: NWProtocolUDP.Options { udpOptions } + + /// The TLS options for this connection, if any. + private var tlsOptions: NWProtocolTLS.Options? + + /// The state of this connection channel. + internal var state: ChannelState = .idle + + /// The active state, used for safely reporting the channel state across threads. + internal var isActive0 = ManagedAtomic(false) + + /// Whether a call to NWConnection.receive has been made, but the completion + /// handler has not yet been invoked. + internal var outstandingRead: Bool = false + + /// The options for this channel. + internal var options = TransportServicesChannelOptions() + + /// Any pending writes that have yet to be delivered to the network stack. + internal var pendingWrites = CircularBuffer(initialCapacity: 8) + + /// An object to keep track of pending writes and manage our backpressure signaling. + internal var _backpressureManager = BackpressureManager() + + /// The value of SO_REUSEADDR. + internal var reuseAddress = false + + /// The value of SO_REUSEPORT. + internal var reusePort = false + + /// Whether to use peer-to-peer connectivity when connecting to Bonjour services. + internal var enablePeerToPeer = false + + /// The cache of the local and remote socket addresses. Must be accessed using _addressCacheLock. + internal var _addressCache = AddressCache(local: nil, remote: nil) + + internal var addressCache: AddressCache { + get { + return self._addressCacheLock.withLock { + return self._addressCache + } + } + set { + return self._addressCacheLock.withLock { + self._addressCache = newValue + } + } + } + + /// A lock that guards the _addressCache. + internal let _addressCacheLock = NIOLock() + + internal var allowLocalEndpointReuse = false + internal var multipathServiceType: NWParameters.MultipathServiceType = .disabled + + var parameters: NWParameters { + NWParameters(dtls: self.tlsOptions, udp: self.udpOptions) + } + + var _inboundStreamOpen: Bool { + switch self.state { + case .active(.open): + return true + case .idle, .registered, .activating, .active, .inactive: + return false + } + } + + func setChannelSpecificOption0