diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index cbefe4e69d..31935bd49f 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -38,6 +38,15 @@ const { owner_symbol, }, } = require('internal/async_hooks'); +const dgram = require('dgram'); +const internalDgram = require('internal/dgram'); + +const { + constants: { + UV_UDP_IPV6ONLY, + UV_UDP_REUSEADDR, + } +} = internalBinding('udp_wrap'); const { writeGeneric, @@ -59,7 +68,6 @@ const { const { codes: { ERR_INVALID_ARG_TYPE, - ERR_INVALID_ARG_VALUE, ERR_INVALID_CALLBACK, ERR_OUT_OF_RANGE, ERR_QUIC_ERROR, @@ -90,8 +98,6 @@ const { constants: { AF_INET, AF_INET6, - UV_UDP_IPV6ONLY, - UV_UDP_REUSEADDR, NGTCP2_ALPN_H3, NGTCP2_MAX_CIDLEN, NGTCP2_MIN_CIDLEN, @@ -221,11 +227,6 @@ function setTransportParams(config) { sessionConfig[IDX_QUIC_SESSION_CONFIG_COUNT] = flags; } -// Called when the socket has been bound and is ready for use -function onSocketReady(fd) { - this[owner_symbol][kReady](fd); -} - // Called when the socket is closed function onSocketClose() { this[owner_symbol].destroy(); @@ -436,7 +437,6 @@ function onSessionSilentClose(statelessReset, code, family) { // Register the callbacks with the QUIC internal binding. setCallbacks({ - onSocketReady, onSocketClose, onSocketError, onSocketServerBusy, @@ -620,6 +620,8 @@ class QuicSocket extends EventEmitter { #type = undefined; #alpn = undefined; #stats = undefined; + #udpSocket = undefined; + #udpHandle = undefined; constructor(options = {}) { const { @@ -667,11 +669,17 @@ class QuicSocket extends EventEmitter { const socketOptions = (validateAddress ? QUICSOCKET_OPTIONS_VALIDATE_ADDRESS : 0) | (validateAddressLRU ? QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU : 0); + this.#udpSocket = dgram.createSocket(type === AF_INET6 ? 'udp6' : 'udp4'); + // TODO(addaleax): Ideally, we would not be needing to store the handle + // separately, and only pass it to the QuicSocketHandle constructor. + this.#udpHandle = this.#udpSocket[internalDgram.kStateSymbol].handle; const handle = new QuicSocketHandle( + this.#udpHandle, socketOptions, retryTokenTimeout, maxConnectionsPerHost); + this.#udpHandle.quicSocket = handle; handle[owner_symbol] = this; this[async_id_symbol] = handle.getAsyncId(); this[kSetHandle](handle); @@ -728,7 +736,7 @@ class QuicSocket extends EventEmitter { const flags = (this.#reuseAddr ? UV_UDP_REUSEADDR : 0) || (this.#ipv6Only ? UV_UDP_IPV6ONLY : 0); - const ret = this[kHandle].bind(this.#type, ip, this.#port || 0, flags); + const ret = this.#udpHandle.bind(ip, this.#port || 0, flags); if (ret) { this.destroy(exceptionWithHostPort(ret, 'bind', ip, this.#port || 0)); return; @@ -736,6 +744,8 @@ class QuicSocket extends EventEmitter { if (typeof callback === 'function') callback(); + + this[kReady](this.#udpHandle.fd); } // The kReady function is called after the socket has been bound to the @@ -913,20 +923,23 @@ class QuicSocket extends EventEmitter { if (handle !== undefined) { this[kSetHandle](); handle[owner_symbol] = undefined; - handle.close((err) => { - // If an error occurs while attempting to close, it will take - // precedence over any original error specified on the args - // TODO(@jasnell): Alternatively we might set the original - // error as a property on the new error. - if (err) error = err; - - // Capture a copy of the stats as they will no longer be - // available once this function returns. - this.#stats = new BigInt64Array(handle.stats); - - if (error) process.nextTick(emit.bind(this, 'error', error)); - process.nextTick(emit.bind(this, 'close')); - }); + handle.ondone = () => { + this.#udpSocket.close((err) => { + // If an error occurs while attempting to close, it will take + // precedence over any original error specified on the args + // TODO(@jasnell): Alternatively we might set the original + // error as a property on the new error. + if (err) error = err; + + // Capture a copy of the stats as they will no longer be + // available once this function returns. + this.#stats = new BigInt64Array(handle.stats); + + if (error) process.nextTick(emit.bind(this, 'error', error)); + process.nextTick(emit.bind(this, 'close')); + }); + }; + handle.waitForPendingCallbacks(); } } @@ -1064,14 +1077,14 @@ class QuicSocket extends EventEmitter { ref() { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('ref'); - this[kHandle].ref(); + this.#udpSocket.ref(); return this; } unref() { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('unref'); - this[kHandle].unref(); + this.#udpSocket.unref(); return this; } @@ -1082,11 +1095,16 @@ class QuicSocket extends EventEmitter { get address() { const out = {}; if (this.#state !== kSocketDestroyed) { - const err = this[kHandle].getsockname(out); - // If err is returned, socket is not bound. - // Return empty object - if (err) - return {}; + try { + return this.#udpSocket.address(); + } catch (err) { + if (err.code === 'EBADF') { + // If there is an EBADF error, the socket is not bound. + // Return empty object + return {}; + } + throw err; + } } return out; } @@ -1114,85 +1132,49 @@ class QuicSocket extends EventEmitter { setTTL(ttl) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setTTL'); - if (typeof ttl !== 'number') - throw new ERR_INVALID_ARG_TYPE('ttl', 'number', ttl); - if (ttl < 1 || ttl > 255) - throw new ERR_INVALID_ARG_VALUE('ttl', ttl); - const err = this[kHandle].setTTL(ttl); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setTTL(ttl); return this; } setMulticastTTL(ttl) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setMulticastTTL'); - if (typeof ttl !== 'number') - throw new ERR_INVALID_ARG_TYPE('ttl', 'number', ttl); - if (ttl < 1 || ttl > 255) - throw new ERR_INVALID_ARG_VALUE('ttl', ttl); - const err = this[kHandle].setMulticastTTL(ttl); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setMulticastTTL(ttl); return this; } setBroadcast(on = true) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setBroadcast'); - if (typeof on !== 'boolean') - throw new ERR_INVALID_ARG_TYPE('on', 'boolean', on); - const err = this[kHandle].setBroadcast(on); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setBroadcast(on); return this; } setMulticastLoopback(on = true) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setMulticastLoopback'); - if (typeof on !== 'boolean') - throw new ERR_INVALID_ARG_TYPE('on', 'boolean', on); - const err = this[kHandle].setMulticastLoopback(on); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setMulticastLoopback(on); return this; } setMulticastInterface(iface) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('setMulticastInterface'); - if (typeof iface !== 'string') - throw new ERR_INVALID_ARG_TYPE('iface', 'string', iface); - const err = this[kHandle].setMulticastInterface(iface); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.setMulticastInterface(iface); return this; } addMembership(address, iface) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('addMembership'); - if (typeof address !== 'string') - throw new ERR_INVALID_ARG_TYPE('address', 'string', address); - if (typeof iface !== 'string') - throw new ERR_INVALID_ARG_TYPE('iface', 'string', iface); - const err = this[kHandle].addMembership(address, iface); - if (err) - throw errnoException(err, 'addMembership'); + this.#udpSocket.addMembership(address, iface); return this; } dropMembership(address, iface) { if (this.#state === kSocketDestroyed) throw new ERR_QUICSOCKET_DESTROYED('dropMembership'); - if (typeof address !== 'string') - throw new ERR_INVALID_ARG_TYPE('address', 'string', address); - if (typeof iface !== 'string') - throw new ERR_INVALID_ARG_TYPE('iface', 'string', iface); - const err = this[kHandle].dropMembership(address, iface); - if (err) - throw errnoException(err, 'dropMembership'); + this.#udpSocket.dropMembership(address, iface); return this; } diff --git a/src/async_wrap.h b/src/async_wrap.h index 9aacf57c54..2a9e82a6a0 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -59,6 +59,7 @@ namespace node { V(QUERYWRAP) \ V(QUICCLIENTSESSION) \ V(QUICSERVERSESSION) \ + V(QUICSENDWRAP) \ V(QUICSOCKET) \ V(QUICSTREAM) \ V(SHUTDOWNWRAP) \ diff --git a/src/env.h b/src/env.h index 3f72d96d9b..c5ee87045e 100644 --- a/src/env.h +++ b/src/env.h @@ -427,7 +427,6 @@ constexpr size_t kFsStatsBufferLength = # define QUIC_ENVIRONMENT_STRONG_PERSISTENT_VALUES(V) \ V(quic_on_socket_close_function, v8::Function) \ V(quic_on_socket_error_function, v8::Function) \ - V(quic_on_socket_ready_function, v8::Function) \ V(quic_on_socket_server_busy_function, v8::Function) \ V(quic_on_session_cert_function, v8::Function) \ V(quic_on_session_client_hello_function, v8::Function) \ diff --git a/src/node_quic.cc b/src/node_quic.cc index 38fb17a547..a10ea14250 100755 --- a/src/node_quic.cc +++ b/src/node_quic.cc @@ -47,7 +47,6 @@ void QuicSetCallbacks(const FunctionCallbackInfo& args) { env->set_quic_on_##callback##_function(fn.As()); \ } while (0) - SETFUNCTION("onSocketReady", socket_ready); SETFUNCTION("onSocketClose", socket_close); SETFUNCTION("onSocketError", socket_error); SETFUNCTION("onSessionReady", session_ready); @@ -164,8 +163,6 @@ void Initialize(Local target, NODE_DEFINE_CONSTANT(constants, SSL_OP_SINGLE_ECDH_USE); NODE_DEFINE_CONSTANT(constants, TLS1_3_VERSION); NODE_DEFINE_CONSTANT(constants, UV_EBADF); - NODE_DEFINE_CONSTANT(constants, UV_UDP_IPV6ONLY); - NODE_DEFINE_CONSTANT(constants, UV_UDP_REUSEADDR); NODE_DEFINE_CONSTANT(constants, IDX_QUIC_SESSION_ACTIVE_CONNECTION_ID_LIMIT); NODE_DEFINE_CONSTANT(constants, IDX_QUIC_SESSION_MAX_STREAM_DATA_BIDI_LOCAL); diff --git a/src/node_quic_socket.cc b/src/node_quic_socket.cc index eb090dbf8e..fe414a4a29 100644 --- a/src/node_quic_socket.cc +++ b/src/node_quic_socket.cc @@ -1,6 +1,7 @@ #include "async_wrap-inl.h" #include "debug_utils.h" #include "env-inl.h" +#include "memory_tracker-inl.h" #include "nghttp2/nghttp2.h" #include "node.h" #include "node_crypto.h" @@ -10,6 +11,7 @@ #include "node_quic_session-inl.h" #include "node_quic_socket.h" #include "node_quic_util.h" +#include "req_wrap-inl.h" #include "util.h" #include "uv.h" #include "v8.h" @@ -31,6 +33,7 @@ using v8::Isolate; using v8::Local; using v8::Number; using v8::Object; +using v8::ObjectTemplate; using v8::PropertyAttribute; using v8::String; using v8::Value; @@ -65,12 +68,11 @@ inline uint32_t GenerateReservedVersion( QuicSocket::QuicSocket( Environment* env, Local wrap, + Local udp_base_wrap, uint64_t retry_token_expiration, size_t max_connections_per_host, uint32_t options) : - HandleWrap(env, wrap, - reinterpret_cast(&handle_), - AsyncWrap::PROVIDER_QUICSOCKET), + AsyncWrap(env, wrap, AsyncWrap::PROVIDER_QUICSOCKET), alloc_info_(MakeAllocator()), options_(options), max_connections_per_host_(max_connections_per_host), @@ -80,7 +82,14 @@ QuicSocket::QuicSocket( env->isolate(), sizeof(socket_stats_) / sizeof(uint64_t), reinterpret_cast(&socket_stats_)) { - CHECK_EQ(uv_udp_init(env->event_loop(), &handle_), 0); + MakeWeak(); + + udp_ = static_cast( + udp_base_wrap->GetAlignedPointerFromInternalField( + UDPWrapBase::kUDPWrapBaseField)); + CHECK_NOT_NULL(udp_); + udp_->set_listener(this); + Debug(this, "New QuicSocket created."); EntropySource(token_secret_.data(), token_secret_.size()); @@ -147,100 +156,18 @@ void QuicSocket::AssociateCID( dcid_to_scid_.emplace(cid->ToStr(), scid->ToStr()); } -int QuicSocket::Bind( - const char* address, - uint32_t port, - uint32_t flags, - int family) { - Debug(this, - "Binding to address %s, port %d, with flags %d, and family %d", - address, port, flags, family); - - HandleScope scope(env()->isolate()); - Context::Scope context_scope(env()->context()); - - sockaddr_storage addr; - int err = SocketAddress::ToSockAddr(family, address, port, &addr); - if (err != 0) - return err; - - Local arg = Undefined(env()->isolate()); - - err = - uv_udp_bind( - &handle_, - reinterpret_cast(&addr), - flags); - if (err != 0) { - Debug(this, "Bind failed. Error %d", err); - arg = Integer::New(env()->isolate(), err); - MakeCallback(env()->quic_on_socket_error_function(), 1, &arg); - return 0; - } - - local_address_.Set(&handle_); +void QuicSocket::OnAfterBind() { + sockaddr_storage addr_buf; + sockaddr* addr = reinterpret_cast(&addr_buf); + int addrlen = sizeof(addr_buf); -#if !defined(_WIN32) - int fd = UV_EBADF; - uv_fileno(reinterpret_cast(&handle_), &fd); - if (fd != UV_EBADF) - arg = Integer::New(env()->isolate(), fd); -#endif + CHECK_EQ(udp_->GetSockName(addr, &addrlen), 0); + local_address_.Copy(addr); + Debug(this, "Socket bound\n"); - MakeCallback(env()->quic_on_socket_ready_function(), 1, &arg); socket_stats_.bound_at = uv_hrtime(); - return 0; } -// If there are no pending QuicSocket::SendWrap callbacks, the -// QuicSocket instance will be closed immediately and the -// close callback will be invoked. Otherwise, the QuicSocket -// will be marked as pending close and will close as soon as -// the final remaining QuicSocket::SendWrap callback is invoked. -// This design ensures that packets that have been sent down to -// the libuv level are processed even tho we are shutting down. -// -// TODO(@jasnell): We will want to implement an additional function -// that will close things down immediately, canceling any still -// pending operations. -void QuicSocket::Close(Local close_callback) { - if (!IsInitialized() || IsFlagSet(QUICSOCKET_FLAGS_PENDING_CLOSE)) - return; - SetFlag(QUICSOCKET_FLAGS_PENDING_CLOSE); - Debug(this, "Closing"); - - CHECK_EQ(false, persistent().IsEmpty()); - if (!close_callback.IsEmpty() && close_callback->IsFunction()) { - object()->Set(env()->context(), - env()->handle_onclose_symbol(), - close_callback).Check(); - } - - // Attempt to close immediately. - MaybeClose(); -} - -// A QuicSocket can close if there are no pending udp send -// callbacks and QuicSocket::Close() has been called. -void QuicSocket::MaybeClose() { - if (!IsInitialized() || - !IsFlagSet(QUICSOCKET_FLAGS_PENDING_CLOSE) || - HasPendingCallbacks()) - return; - - CHECK_EQ(false, persistent().IsEmpty()); - - Debug(this, "Closing the libuv handle"); - - // Close the libuv handle first. The OnClose handler - // will free the QuicSocket instance after it invokes - // the close callback, letting the JavaScript side know - // that the handle is being freed. - uv_close(GetHandle(), OnClose); - MarkAsClosing(); -} - - void QuicSocket::DisassociateCID(QuicCID* cid) { Debug(this, "Removing associations for cid %s", cid->ToHex().c_str()); dcid_to_scid_.erase(cid->ToStr()); @@ -276,38 +203,41 @@ void QuicSocket::StopListening() { SetFlag(QUICSOCKET_FLAGS_SERVER_LISTENING, false); } -void QuicSocket::OnAlloc( - uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { - QuicSocket* socket = - ContainerOf(&QuicSocket::handle_, reinterpret_cast(handle)); - *buf = socket->env()->AllocateManaged(suggested_size).release(); +void QuicSocket::WaitForPendingCallbacks() { + if (!HasPendingCallbacks()) { + Debug(this, "No pending callbacks, calling ondone immediately"); + MakeCallback(env()->ondone_string(), 0, nullptr); + return; + } + SetFlag(QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS); + Debug(this, "Waiting for pending callbacks"); +} + +uv_buf_t QuicSocket::OnAlloc(size_t suggested_size) { + return env()->AllocateManaged(suggested_size).release(); } void QuicSocket::OnRecv( - uv_udp_t* handle, ssize_t nread, - const uv_buf_t* buf_, + const uv_buf_t& buf_, const struct sockaddr* addr, unsigned int flags) { - QuicSocket* socket = ContainerOf(&QuicSocket::handle_, handle); - AllocatedBuffer buf(socket->env(), *buf_); + AllocatedBuffer buf(env(), buf_); if (nread == 0) return; if (nread < 0) { - Debug(socket, "Reading data from UDP socket failed. Error %d", nread); - Environment* env = socket->env(); + Debug(this, "Reading data from UDP socket failed. Error %d", nread); + Environment* env = this->env(); HandleScope scope(env->isolate()); Context::Scope context_scope(env->context()); Local arg = Number::New(env->isolate(), static_cast(nread)); - socket->MakeCallback(env->quic_on_socket_error_function(), 1, &arg); + MakeCallback(env->quic_on_socket_error_function(), 1, &arg); return; } - socket->Receive(nread, std::move(buf), addr, flags); + Receive(nread, std::move(buf), addr, flags); } void QuicSocket::Receive( @@ -435,14 +365,11 @@ void QuicSocket::Receive( } int QuicSocket::ReceiveStart() { - int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv); - if (err == UV_EALREADY) - err = 0; - return err; + return udp_->RecvStart(); } int QuicSocket::ReceiveStop() { - return uv_udp_recv_stop(&handle_); + return udp_->RecvStop(); } void QuicSocket::RemoveSession(QuicCID* cid, const sockaddr* addr) { @@ -494,18 +421,13 @@ void QuicSocket::SendInitialConnectionClose( &alloc_info_, nullptr); - SendWrapStack* req = - new SendWrapStack( - this, - addr, - NGTCP2_MAX_PKTLEN_IPV6, - "initial cc"); + MallocedBuffer buf(NGTCP2_MAX_PKTLEN_IPV6); ssize_t nwrite = ngtcp2_conn_write_connection_close( conn, *path, - req->buffer(), + reinterpret_cast(buf.data), NGTCP2_MAX_PKTLEN_IPV6, error_code, uv_hrtime()); @@ -514,10 +436,10 @@ void QuicSocket::SendInitialConnectionClose( // is serialized. We won't be using this one any longer. ngtcp2_conn_del(conn); - if (nwrite > 0) { - req->SetLength(nwrite); - if (req->Send() != 0) delete req; // TODO(addaleax): Better error handling? - } + if (nwrite <= 0) + return; + buf.Realloc(nwrite); + Send(addr, std::move(buf), "initial cc"); } void QuicSocket::SendVersionNegotiation( @@ -525,13 +447,6 @@ void QuicSocket::SendVersionNegotiation( QuicCID* dcid, QuicCID* scid, const sockaddr* addr) { - SendWrapStack* req = - new SendWrapStack( - this, - addr, - NGTCP2_MAX_PKTLEN_IPV6, - "version negotiation"); - std::array sv; sv[0] = GenerateReservedVersion(addr, version); sv[1] = NGTCP2_PROTO_VER; @@ -539,8 +454,9 @@ void QuicSocket::SendVersionNegotiation( uint8_t unused_random; EntropySource(&unused_random, 1); + MallocedBuffer buf(NGTCP2_MAX_PKTLEN_IPV6); ssize_t nwrite = ngtcp2_pkt_write_version_negotiation( - req->buffer(), + reinterpret_cast(buf.data), NGTCP2_MAX_PKTLEN_IPV6, unused_random, dcid->data(), @@ -549,10 +465,10 @@ void QuicSocket::SendVersionNegotiation( scid->length(), sv.data(), sv.size()); - if (nwrite < 0) + if (nwrite <= 0) return; - req->SetLength(nwrite); - if (req->Send() != 0) delete req; // TODO(addaleax): Better error handling? + buf.Realloc(nwrite); + Send(addr, std::move(buf), "version negotiation"); } bool QuicSocket::SendRetry( @@ -560,13 +476,6 @@ bool QuicSocket::SendRetry( QuicCID* dcid, QuicCID* scid, const sockaddr* addr) { - SendWrapStack* req = - new SendWrapStack( - this, - addr, - NGTCP2_MAX_PKTLEN_IPV4, - "retry"); - std::array token; size_t tokenlen = token.size(); @@ -592,9 +501,10 @@ bool QuicSocket::SendRetry( EntropySource(hd.scid.data, NGTCP2_SV_SCIDLEN); + MallocedBuffer buf(NGTCP2_MAX_PKTLEN_IPV4); ssize_t nwrite = ngtcp2_pkt_write_retry( - req->buffer(), + reinterpret_cast(buf.data), NGTCP2_MAX_PKTLEN_IPV4, &hd, **dcid, @@ -602,12 +512,7 @@ bool QuicSocket::SendRetry( tokenlen); if (nwrite <= 0) return false; - - req->SetLength(nwrite); - - int err = req->Send(); - if (err != 0) delete req; - return err == 0; + return Send(addr, std::move(buf), "retry") == 0; } namespace { @@ -771,39 +676,30 @@ void QuicSocket::SetServerBusy(bool on) { MakeCallback(env()->quic_on_socket_server_busy_function(), 1, &arg); } -int QuicSocket::SetTTL(int ttl) { - Debug(this, "Setting UDP TTL to %d", ttl); - return uv_udp_set_ttl(&handle_, ttl); -} - -int QuicSocket::SetMulticastTTL(int ttl) { - Debug(this, "Setting UDP Multicast TTL to %d", ttl); - return uv_udp_set_multicast_ttl(&handle_, ttl); -} - -int QuicSocket::SetBroadcast(bool on) { - Debug(this, "Turning UDP Broadcast %s", on ? "on" : "off"); - return uv_udp_set_broadcast(&handle_, on ? 1 : 0); -} - -int QuicSocket::SetMulticastLoopback(bool on) { - Debug(this, "Turning UDP Multicast Loopback %s", on ? "on" : "off"); - return uv_udp_set_multicast_loop(&handle_, on ? 1 : 0); +QuicSocket::SendWrap::SendWrap( + Environment* env, + Local req_wrap_obj, + size_t total_length) + : ReqWrap(env, req_wrap_obj, PROVIDER_QUICSOCKET), + total_length_(total_length) { } -int QuicSocket::SetMulticastInterface(const char* iface) { - Debug(this, "Setting the UDP Multicast Interface to %s", iface); - return uv_udp_set_multicast_interface(&handle_, iface); +std::string QuicSocket::SendWrap::MemoryInfoName() const { + return "QuicSendWrap"; } -int QuicSocket::AddMembership(const char* address, const char* iface) { - Debug(this, "Joining UDP group: address %s, iface %s", address, iface); - return uv_udp_set_membership(&handle_, address, iface, UV_JOIN_GROUP); +void QuicSocket::SendWrap::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("session", session_); + tracker->TrackField("buffer", buffer_); + tracker->TrackField("data", data_); } -int QuicSocket::DropMembership(const char* address, const char* iface) { - Debug(this, "Leaving UDP group: address %s, iface %s", address, iface); - return uv_udp_set_membership(&handle_, address, iface, UV_LEAVE_GROUP); +ReqWrap* QuicSocket::CreateSendWrap(size_t msg_size) { + HandleScope handle_scope(env()->isolate()); + Local obj; + if (!env()->quicsocketsendwrap_constructor_template() + ->NewInstance(env()->context()).ToLocal(&obj)) return nullptr; + return last_created_send_wrap_ = new SendWrap(env(), obj, msg_size); } int QuicSocket::SendPacket( @@ -818,190 +714,135 @@ int QuicSocket::SendPacket( if (buffer->Length() == 0 || buffer->RemainingLength() == 0) return 0; - char host[INET6_ADDRSTRLEN]; - SocketAddress::GetAddress(dest, host, sizeof(host)); - Debug(this, "Sending to %s at port %d", host, SocketAddress::GetPort(dest)); - - QuicSocket::SendWrap* wrap = - new QuicSocket::SendWrap( - this, - dest, - buffer, - session, - diagnostic_label); - int err = wrap->Send(); - if (err != 0) delete wrap; - return err; -} + { + char host[INET6_ADDRSTRLEN]; + SocketAddress::GetAddress(dest, host, sizeof(host)); + Debug(this, "Sending to %s at port %d", host, SocketAddress::GetPort(dest)); + } -void QuicSocket::OnSend( - int status, - size_t length, - const char* diagnostic_label) { - IncrementSocketStat( - length, - &socket_stats_, - &socket_stats::bytes_sent); - IncrementSocketStat( - 1, - &socket_stats_, - &socket_stats::packets_sent); + // Remaining Length should never be zero at this point + CHECK_GT(buffer->RemainingLength(), 0); - Debug(this, "Packet sent status: %d (label: %s)", - status, - diagnostic_label != nullptr ? diagnostic_label : "unspecified"); + std::vector vec; + size_t total_length; + size_t len = buffer->DrainInto(&vec, &total_length); - DecrementPendingCallbacks(); - MaybeClose(); -} + // len should never be zero + CHECK_GT(len, 0); -QuicSocket::SendWrapBase::SendWrapBase( - QuicSocket* socket, - const sockaddr* dest, - const char* diagnostic_label) : - socket_(socket), - diagnostic_label_(diagnostic_label) { - req_.data = this; - address_.Copy(dest); - socket->IncrementPendingCallbacks(); -} + Debug(this, "Sending %" PRIu64 " bytes (label: %s)", + total_length, diagnostic_label); + // If DiagnosticPacketLoss returns true, it will call Done() internally + if (UNLIKELY(IsDiagnosticPacketLoss(tx_loss_))) { + Debug(this, "Simulating transmitted packet loss."); + return 0; + } -void QuicSocket::SendWrapBase::OnSend(uv_udp_send_t* req, int status) { - std::unique_ptr wrap( - static_cast(req->data)); - wrap->Done(status); -} + last_created_send_wrap_ = nullptr; + int err = udp_->Send(vec.data(), vec.size(), dest); -bool QuicSocket::SendWrapBase::IsDiagnosticPacketLoss() { - if (Socket()->IsDiagnosticPacketLoss(Socket()->tx_loss_)) { - Debug(Socket(), "Simulating transmitted packet loss."); - Done(0); - return true; - } - return false; -} + Debug(this, "Advancing read head %" PRIu64 " status = %d", + total_length, err); + buffer->SeekHeadOffset(total_length); -void QuicSocket::SendWrapBase::Done(int status) { - socket_->env()->DecreaseWaitingRequestCounter(); - socket_->OnSend(status, Length(), diagnostic_label()); -} + if (err != 0) { + if (err > 0) err = 0; + OnSend(err, total_length, buffer, diagnostic_label); + } else { + IncrementPendingCallbacks(); -QuicSocket::SendWrapStack::SendWrapStack( - QuicSocket* socket, - const sockaddr* dest, - size_t len, - const char* diagnostic_label) : - SendWrapBase(socket, dest, diagnostic_label) { - buf_.AllocateSufficientStorage(len); + CHECK_NOT_NULL(last_created_send_wrap_); + last_created_send_wrap_->set_diagnostic_label(diagnostic_label); + last_created_send_wrap_->set_quic_buffer(buffer); + last_created_send_wrap_->set_session(session); + } + return err; } -int QuicSocket::SendWrapStack::Send() { - Debug(Socket(), "Sending %" PRIu64 " bytes (label: %s)", - buf_.length(), - diagnostic_label()); +int QuicSocket::Send(const sockaddr* addr, + MallocedBuffer&& buf, + const char* diagnostic_label) { + Debug(this, "Sending %" PRIu64 " bytes (label: %s)", + buf.size, + diagnostic_label); - CHECK_GT(buf_.length(), 0); + CHECK_GT(buf.size, 0); // If DiagnosticPacketLoss returns true, it will call Done() internally - if (UNLIKELY(IsDiagnosticPacketLoss())) + if (UNLIKELY(IsDiagnosticPacketLoss(tx_loss_))) { + Debug(this, "Simulating transmitted packet loss."); return 0; + } - uv_buf_t buf = - uv_buf_init( - reinterpret_cast(*buf_), - buf_.length()); - - int err = uv_udp_send( - req(), - &Socket()->handle_, - &buf, 1, - **Address(), - OnSend); - // As this does not inherit from ReqWrap, we have to manage the request - // counter manually. - if (err == 0) Socket()->env()->IncreaseWaitingRequestCounter(); - return err; -} - -// The QuicSocket::SendWrap will maintain a std::weak_ref -// pointer to the buffer given to it. -QuicSocket::SendWrap::SendWrap( - QuicSocket* socket, - SocketAddress* dest, - QuicBuffer* buffer, - BaseObjectPtr session, - const char* diagnostic_label) - : SendWrap(socket, **dest, buffer, session, diagnostic_label) {} + uv_buf_t buf_send = uv_buf_init(buf.data, buf.size); -QuicSocket::SendWrap::SendWrap( - QuicSocket* socket, - const sockaddr* dest, - QuicBuffer* buffer, - BaseObjectPtr session, - const char* diagnostic_label) - : SendWrapBase(socket, dest, diagnostic_label), - buffer_(buffer), - session_(session) {} - -void QuicSocket::SendWrap::Done(int status) { - // If the weak_ref to the QuicBuffer is still valid - // consume the data, otherwise, do nothing - if (status == 0) { - Debug(Socket(), "Consuming %" PRId64 " bytes (label: %s)", - length_, - diagnostic_label()); - buffer_->Consume(length_); + last_created_send_wrap_ = nullptr; + int err = udp_->Send(&buf_send, 1, addr); + if (err != 0) { + if (err > 0) err = 0; + OnSend(err, buf.size, nullptr, diagnostic_label); } else { - Debug(Socket(), "Cancelling %" PRId64 " bytes (status: %d, label: %s)", - length_, - status, - diagnostic_label()); - buffer_->Cancel(status); + IncrementPendingCallbacks(); + + CHECK_NOT_NULL(last_created_send_wrap_); + last_created_send_wrap_->set_diagnostic_label(diagnostic_label); + last_created_send_wrap_->set_data(std::move(buf)); } - SendWrapBase::Done(status); + return err; } -// Sending will take the current content of the QuicBuffer -// and forward it off to the uv_udp_t handle. -int QuicSocket::SendWrap::Send() { - // Remaining Length should never be zero at this point - CHECK_GT(buffer_->RemainingLength(), 0); - - std::vector vec; - size_t len = buffer_->DrainInto(&vec, &length_); - - // len should never be zero - CHECK_GT(len, 0); +void QuicSocket::OnSend( + int status, + size_t length, + QuicBuffer* buffer, + const char* diagnostic_label) { + if (buffer != nullptr) { + // If the weak_ref to the QuicBuffer is still valid + // consume the data, otherwise, do nothing + if (status == 0) { + Debug(this, "Consuming %" PRId64 " bytes (label: %s)", + length, + diagnostic_label); + buffer->Consume(length); + } else { + Debug(this, "Cancelling %" PRId64 " bytes (status: %d, label: %s)", + length, + status, + diagnostic_label); + buffer->Cancel(status); + } + } - Debug(Socket(), - "Sending %" PRIu64 " bytes (label: %s)", - length_, - diagnostic_label()); + IncrementSocketStat( + length, + &socket_stats_, + &socket_stats::bytes_sent); + IncrementSocketStat( + 1, + &socket_stats_, + &socket_stats::packets_sent); - // If DiagnosticPacketLoss returns true, it will call Done() internally - if (UNLIKELY(IsDiagnosticPacketLoss())) - return 0; + Debug(this, "Packet sent status: %d (label: %s)", + status, + diagnostic_label != nullptr ? diagnostic_label : "unspecified"); - int err = uv_udp_send( - req(), - &(Socket()->handle_), - vec.data(), - vec.size(), - **Address(), - OnSend); - - if (err == 0) { - // As this does not inherit from ReqWrap, we have to manage the request - // counter manually. - Socket()->env()->IncreaseWaitingRequestCounter(); - Debug(Socket(), "Advancing read head %" PRIu64, length_); - buffer_->SeekHeadOffset(length_); + if (!HasPendingCallbacks() && + IsFlagSet(QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS)) { + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + MakeCallback(env()->ondone_string(), 0, nullptr); } - return err; } - +void QuicSocket::OnSendDone(ReqWrap* wrap, int status) { + std::unique_ptr req_wrap(static_cast(wrap)); + DecrementPendingCallbacks(); + OnSend(status, + req_wrap->total_length(), + req_wrap->quic_buffer(), + req_wrap->diagnostic_label()); +} bool QuicSocket::IsDiagnosticPacketLoss(double prob) { if (LIKELY(prob == 0.0)) return false; @@ -1032,14 +873,17 @@ namespace { void NewQuicSocket(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); CHECK(args.IsConstructCall()); + CHECK(args[0]->IsObject()); + CHECK_GE(args[0].As()->InternalFieldCount(), + UDPWrapBase::kUDPWrapBaseField); uint32_t options; uint32_t retry_token_expiration; uint32_t max_connections_per_host; - if (!args[0]->Uint32Value(env->context()).To(&options) || - !args[1]->Uint32Value(env->context()).To(&retry_token_expiration) || - !args[2]->Uint32Value(env->context()).To(&max_connections_per_host)) { + if (!args[1]->Uint32Value(env->context()).To(&options) || + !args[2]->Uint32Value(env->context()).To(&retry_token_expiration) || + !args[3]->Uint32Value(env->context()).To(&max_connections_per_host)) { return; } CHECK_GE(retry_token_expiration, MIN_RETRYTOKEN_EXPIRATION); @@ -1048,6 +892,7 @@ void NewQuicSocket(const FunctionCallbackInfo& args) { new QuicSocket( env, args.This(), + args[0].As(), retry_token_expiration, max_connections_per_host, options); @@ -1075,60 +920,12 @@ void QuicSocketSetDiagnosticPacketLoss( socket->SetDiagnosticPacketLoss(rx, tx); } -void QuicSocketAddMembership(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 2); - CHECK(args[0]->IsString()); - CHECK(args[1]->IsString()); - - Utf8Value address(env->isolate(), args[0]); - Utf8Value iface(env->isolate(), args[1]); - args.GetReturnValue().Set(socket->AddMembership(*address, *iface)); -} - -void QuicSocketBind(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - - CHECK_EQ(args.Length(), 4); - - node::Utf8Value address(args.GetIsolate(), args[1]); - int32_t type; - uint32_t port, flags; - if (!args[0]->Int32Value(env->context()).To(&type) || - !args[2]->Uint32Value(env->context()).To(&port) || - !args[3]->Uint32Value(env->context()).To(&flags)) - return; - CHECK(type == AF_INET || type == AF_INET6); - - args.GetReturnValue().Set(socket->Bind(*address, port, flags, type)); -} - void QuicSocketDestroy(const FunctionCallbackInfo& args) { QuicSocket* socket; ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder()); socket->ReceiveStop(); } -void QuicSocketDropMembership(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 2); - CHECK(args[0]->IsString()); - CHECK(args[1]->IsString()); - - Utf8Value address(env->isolate(), args[0]); - Utf8Value iface(env->isolate(), args[1]); - args.GetReturnValue().Set(socket->DropMembership(*address, *iface)); -} - void QuicSocketListen(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); QuicSocket* socket; @@ -1193,34 +990,6 @@ void QuicSocketReceiveStop(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(socket->ReceiveStop()); } -void QuicSocketSetBroadcast(const FunctionCallbackInfo& args) { - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - args.GetReturnValue().Set(socket->SetBroadcast(args[0]->IsTrue())); -} - -void QuicSocketSetMulticastInterface(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - CHECK(args[0]->IsString()); - - Utf8Value iface(env->isolate(), args[0]); - args.GetReturnValue().Set(socket->SetMulticastInterface(*iface)); -} - -void QuicSocketSetMulticastLoopback(const FunctionCallbackInfo& args) { - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - args.GetReturnValue().Set(socket->SetMulticastLoopback(args[0]->IsTrue())); -} - void QuicSocketSetServerBusy(const FunctionCallbackInfo& args) { QuicSocket* socket; ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder()); @@ -1228,29 +997,13 @@ void QuicSocketSetServerBusy(const FunctionCallbackInfo& args) { socket->SetServerBusy(args[0]->IsTrue()); } -void QuicSocketSetMulticastTTL(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); +void QuicSocketWaitForPendingCallbacks( + const FunctionCallbackInfo& args) { QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - int ttl; - if (!args[0]->Int32Value(env->context()).To(&ttl)) - return; - args.GetReturnValue().Set(socket->SetMulticastTTL(ttl)); + ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder()); + socket->WaitForPendingCallbacks(); } -void QuicSocketSetTTL(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - QuicSocket* socket; - ASSIGN_OR_RETURN_UNWRAP(&socket, args.Holder(), - args.GetReturnValue().Set(UV_EBADF)); - CHECK_EQ(args.Length(), 1); - int ttl; - if (!args[0]->Int32Value(env->context()).To(&ttl)) - return; - args.GetReturnValue().Set(socket->SetTTL(ttl)); -} } // namespace void QuicSocket::Initialize( @@ -1263,21 +1016,9 @@ void QuicSocket::Initialize( socket->SetClassName(class_name); socket->InstanceTemplate()->SetInternalFieldCount(1); socket->InstanceTemplate()->Set(env->owner_symbol(), Null(isolate)); - env->SetProtoMethod(socket, - "addMembership", - QuicSocketAddMembership); - env->SetProtoMethod(socket, - "bind", - QuicSocketBind); env->SetProtoMethod(socket, "destroy", QuicSocketDestroy); - env->SetProtoMethod(socket, - "dropMembership", - QuicSocketDropMembership); - env->SetProtoMethod(socket, - "getsockname", - node::GetSockOrPeerName); env->SetProtoMethod(socket, "listen", QuicSocketListen); @@ -1290,30 +1031,23 @@ void QuicSocket::Initialize( env->SetProtoMethod(socket, "setDiagnosticPacketLoss", QuicSocketSetDiagnosticPacketLoss); - env->SetProtoMethod(socket, - "setTTL", - QuicSocketSetTTL); - env->SetProtoMethod(socket, - "setBroadcast", - QuicSocketSetBroadcast); - env->SetProtoMethod(socket, - "setMulticastInterface", - QuicSocketSetMulticastInterface); - env->SetProtoMethod(socket, - "setMulticastTTL", - QuicSocketSetMulticastTTL); - env->SetProtoMethod(socket, - "setMulticastLoopback", - QuicSocketSetMulticastLoopback); env->SetProtoMethod(socket, "setServerBusy", QuicSocketSetServerBusy); env->SetProtoMethod(socket, "stopListening", QuicSocketStopListening); + env->SetProtoMethod(socket, + "waitForPendingCallbacks", + QuicSocketWaitForPendingCallbacks); socket->Inherit(HandleWrap::GetConstructorTemplate(env)); target->Set(context, class_name, socket->GetFunction(env->context()).ToLocalChecked()).FromJust(); + + // TODO(addaleax): None of these templates actually are constructor templates. + Local sendwrap_template = ObjectTemplate::New(isolate); + sendwrap_template->SetInternalFieldCount(1); + env->set_quicsocketsendwrap_constructor_template(sendwrap_template); } } // namespace quic diff --git a/src/node_quic_socket.h b/src/node_quic_socket.h index 75a578dd60..551ba9907c 100644 --- a/src/node_quic_socket.h +++ b/src/node_quic_socket.h @@ -10,7 +10,7 @@ #include "node_quic_session.h" #include "node_quic_util.h" #include "env.h" -#include "handle_wrap.h" +#include "udp_wrap.h" #include "v8.h" #include "uv.h" @@ -43,7 +43,8 @@ enum QuicSocketOptions : uint32_t { QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU = 0x2, }; -class QuicSocket : public HandleWrap, +class QuicSocket : public AsyncWrap, + public UDPListener, public mem::NgLibMemoryManager { public: static void Initialize( @@ -54,6 +55,7 @@ class QuicSocket : public HandleWrap, QuicSocket( Environment* env, Local wrap, + Local udp_base_wrap, uint64_t retry_token_expiration, size_t max_connections_per_host, uint32_t options = 0); @@ -61,30 +63,16 @@ class QuicSocket : public HandleWrap, SocketAddress* GetLocalAddress() { return &local_address_; } - void Close( - v8::Local close_callback = v8::Local()) override; - void MaybeClose(); - int AddMembership( - const char* address, - const char* iface); void AddSession( QuicCID* cid, BaseObjectPtr session); void AssociateCID( QuicCID* cid, QuicCID* scid); - int Bind( - const char* address, - uint32_t port, - uint32_t flags, - int family); void DisassociateCID( QuicCID* cid); - int DropMembership( - const char* address, - const char* iface); void Listen( crypto::SecureContext* context, const sockaddr* preferred_address = nullptr, @@ -97,16 +85,6 @@ class QuicSocket : public HandleWrap, const sockaddr* addr); void ReportSendError( int error); - int SetBroadcast( - bool on); - int SetMulticastInterface( - const char* iface); - int SetMulticastLoopback( - bool on); - int SetMulticastTTL( - int ttl); - int SetTTL( - int ttl); int SendPacket( const sockaddr* dest, QuicBuffer* buf, @@ -115,13 +93,12 @@ class QuicSocket : public HandleWrap, void SetServerBusy(bool on); void SetDiagnosticPacketLoss(double rx = 0.0, double tx = 0.0); void StopListening(); + void WaitForPendingCallbacks(); crypto::SecureContext* GetServerSecureContext() { return server_secure_context_; } - const uv_udp_t* operator*() const { return &handle_; } - void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(QuicSocket) SET_SELF_SIZE(QuicSocket) @@ -131,6 +108,16 @@ class QuicSocket : public HandleWrap, void IncreaseAllocatedSize(size_t size); void DecreaseAllocatedSize(size_t size); + // Implementation for UDPWrapListener + uv_buf_t OnAlloc(size_t suggested_size) override; + void OnRecv(ssize_t nread, + const uv_buf_t& buf, + const sockaddr* addr, + unsigned int flags) override; + ReqWrap* CreateSendWrap(size_t msg_size) override; + void OnSendDone(ReqWrap* wrap, int status) override; + void OnAfterBind() override; + private: static void OnAlloc( uv_handle_t* handle, @@ -165,6 +152,7 @@ class QuicSocket : public HandleWrap, void OnSend( int status, size_t length, + QuicBuffer* buffer, const char* diagnostic_label); void SetValidatedAddress(const sockaddr* addr); @@ -192,25 +180,17 @@ class QuicSocket : public HandleWrap, void DecrementPendingCallbacks() { pending_callbacks_--; } bool HasPendingCallbacks() { return pending_callbacks_ > 0; } - template - friend void node::GetSockOrPeerName( - const v8::FunctionCallbackInfo&); - // Returns true if, and only if, diagnostic packet loss is enabled // and the current packet should be artificially considered lost. bool IsDiagnosticPacketLoss(double prob); - // Fields and TypeDefs - typedef uv_udp_t HandleType; - enum QuicSocketFlags : uint32_t { QUICSOCKET_FLAGS_NONE = 0x0, // Indicates that the QuicSocket has entered a graceful // closing phase, indicating that no additional QUICSOCKET_FLAGS_GRACEFUL_CLOSE = 0x1, - QUICSOCKET_FLAGS_PENDING_CLOSE = 0x2, + QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS = 0x2, QUICSOCKET_FLAGS_SERVER_LISTENING = 0x4, QUICSOCKET_FLAGS_SERVER_BUSY = 0x8, }; @@ -238,7 +218,7 @@ class QuicSocket : public HandleWrap, } ngtcp2_mem alloc_info_; - uv_udp_t handle_; + UDPWrapBase* udp_; uint32_t flags_ = QUICSOCKET_FLAGS_NONE; uint32_t options_; uint32_t server_options_; @@ -330,96 +310,37 @@ class QuicSocket : public HandleWrap, access(a, mems...) += delta; } - class SendWrapBase { + class SendWrap : public ReqWrap { public: - SendWrapBase( - QuicSocket* socket, - const sockaddr* dest, - const char* diagnostic_label = nullptr); - - virtual ~SendWrapBase() = default; - - virtual void Done(int status); - - virtual int Send() = 0; - - uv_udp_send_t* operator*() { return &req_; } - - uv_udp_send_t* req() { return &req_; } - - QuicSocket* Socket() { return socket_.get(); } - - SocketAddress* Address() { return &address_; } - + SendWrap(Environment* env, + v8::Local req_wrap_obj, + size_t total_length_); + + void set_data(MallocedBuffer&& data) { data_ = std::move(data); } + void set_quic_buffer(QuicBuffer* buffer) { buffer_ = buffer; } + void set_session(BaseObjectPtr session) { session_ = session; } + void set_diagnostic_label(const char* label) { diagnostic_label_ = label; } + QuicBuffer* quic_buffer() const { return buffer_; } const char* diagnostic_label() const { return diagnostic_label_; } + size_t total_length() const { return total_length_; } - static void OnSend( - uv_udp_send_t* req, - int status); - - virtual size_t Length() = 0; - - bool IsDiagnosticPacketLoss(); + SET_SELF_SIZE(SendWrap); + std::string MemoryInfoName() const override; + void MemoryInfo(MemoryTracker* tracker) const override; private: - uv_udp_send_t req_; - BaseObjectPtr socket_; - SocketAddress address_; - const char* diagnostic_label_; - }; - - // The SendWrap drains the given QuicBuffer and sends it to the - // uv_udp_t handle. When the async operation completes, the done_cb - // is invoked with the status and the user_data forwarded on. - class SendWrap : public SendWrapBase { - public: - SendWrap( - QuicSocket* socket, - SocketAddress* dest, - QuicBuffer* buffer, - BaseObjectPtr session, - const char* diagnostic_label = nullptr); - - SendWrap( - QuicSocket* socket, - const sockaddr* dest, - QuicBuffer* buffer, - BaseObjectPtr session, - const char* diagnostic_label = nullptr); - - void Done(int status) override; - - int Send() override; - - size_t Length() override { return length_; } - - private: - QuicBuffer* buffer_; BaseObjectPtr session_; - size_t length_ = 0; + QuicBuffer* buffer_ = nullptr; + MallocedBuffer data_; + const char* diagnostic_label_ = nullptr; + size_t total_length_; }; - class SendWrapStack : public SendWrapBase { - public: - SendWrapStack( - QuicSocket* socket, - const sockaddr* dest, - size_t len, - const char* diagnostic_label = nullptr); - - int Send() override; - - uint8_t* buffer() { return *buf_; } - - void SetLength(size_t len) { - buf_.SetLength(len); - } + SendWrap* last_created_send_wrap_ = nullptr; - size_t Length() override { return buf_.length(); } - - private: - MaybeStackBuffer buf_; - }; + int Send(const sockaddr* addr, + MallocedBuffer&& data, + const char* diagnostic_label = "unspecified"); }; } // namespace quic diff --git a/src/node_quic_util.h b/src/node_quic_util.h index 0f7ecbbd3a..1d86967f92 100644 --- a/src/node_quic_util.h +++ b/src/node_quic_util.h @@ -271,14 +271,6 @@ class SocketAddress { memcpy(&address_, source, GetAddressLen(source)); } - void Set(uv_udp_t* handle) { - int addrlen = sizeof(address_); - CHECK_EQ(uv_udp_getsockname( - handle, - reinterpret_cast(&address_), - &addrlen), 0); - } - void Update(const ngtcp2_addr* addr) { memcpy(&address_, addr->addr, addr->addrlen); } diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index 0f629f623e..8adb1e2186 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -184,6 +184,7 @@ void UDPWrap::Initialize(Local target, Local constants = Object::New(env->isolate()); NODE_DEFINE_CONSTANT(constants, UV_UDP_IPV6ONLY); + NODE_DEFINE_CONSTANT(constants, UV_UDP_REUSEADDR); target->Set(context, env->constants_string(), constants).Check(); diff --git a/test/parallel/test-quic-quicsocket.js b/test/parallel/test-quic-quicsocket.js index 80130033aa..deffe35d9b 100644 --- a/test/parallel/test-quic-quicsocket.js +++ b/test/parallel/test-quic-quicsocket.js @@ -124,20 +124,10 @@ socket.on('ready', common.mustCall(() => { socket.setBroadcast(); socket.setBroadcast(true); socket.setBroadcast(false); - [1, 'test', {}, NaN, 1n, null].forEach((i) => { - assert.throws(() => socket.setBroadcast(i), { - code: 'ERR_INVALID_ARG_TYPE' - }); - }); socket.setMulticastLoopback(); socket.setMulticastLoopback(true); socket.setMulticastLoopback(false); - [1, 'test', {}, NaN, 1n, null].forEach((i) => { - assert.throws(() => socket.setMulticastLoopback(i), { - code: 'ERR_INVALID_ARG_TYPE' - }); - }); socket.setMulticastInterface('0.0.0.0');