Skip to content

Commit

Permalink
dgram: make UDPWrap more reusable
Browse files Browse the repository at this point in the history
Allow using the handle more directly for I/O in other parts of
the codebase.

Originally landed in the QUIC repo

Original review metadata:

```
  PR-URL: nodejs/quic#165
  Reviewed-By: James M Snell <jasnell@gmail.com>
  Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com>
```

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: #31871
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
  • Loading branch information
addaleax authored and targos committed Apr 28, 2020
1 parent c979aea commit 322a998
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 60 deletions.
4 changes: 3 additions & 1 deletion lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
this.on('listening', onListening);
}

if (port instanceof UDP) {
if (port !== null &&
typeof port === 'object' &&
typeof port.recvStart === 'function') {
replaceHandle(this, port);
startListening(this);
return this;
Expand Down
205 changes: 151 additions & 54 deletions src/udp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
}


inline bool SendWrap::have_callback() const {
bool SendWrap::have_callback() const {
return have_callback_;
}

UDPListener::~UDPListener() {
if (wrap_ != nullptr)
wrap_->set_listener(nullptr);
}

UDPWrapBase::~UDPWrapBase() {
set_listener(nullptr);
}

UDPListener* UDPWrapBase::listener() const {
CHECK_NOT_NULL(listener_);
return listener_;
}

void UDPWrapBase::set_listener(UDPListener* listener) {
if (listener_ != nullptr)
listener_->wrap_ = nullptr;
listener_ = listener;
if (listener_ != nullptr) {
CHECK_NULL(listener_->wrap_);
listener_->wrap_ = this;
}
}

UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
CHECK_GT(obj->InternalFieldCount(), UDPWrapBase::kUDPWrapBaseField);
return static_cast<UDPWrapBase*>(
obj->GetAlignedPointerFromInternalField(UDPWrapBase::kUDPWrapBaseField));
}

void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
env->SetProtoMethod(t, "recvStart", RecvStart);
env->SetProtoMethod(t, "recvStop", RecvStop);
}

UDPWrap::UDPWrap(Environment* env, Local<Object> object)
: HandleWrap(env,
object,
reinterpret_cast<uv_handle_t*>(&handle_),
AsyncWrap::PROVIDER_UDPWRAP) {
object->SetAlignedPointerInInternalField(
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));

int r = uv_udp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // can't fail anyway

set_listener(this);
}


Expand All @@ -91,7 +130,8 @@ void UDPWrap::Initialize(Local<Object> target,
Environment* env = Environment::GetCurrent(context);

Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
t->InstanceTemplate()->SetInternalFieldCount(UDPWrap::kInternalFieldCount);
t->InstanceTemplate()->SetInternalFieldCount(
UDPWrapBase::kInternalFieldCount);
Local<String> udpString =
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
t->SetClassName(udpString);
Expand All @@ -112,6 +152,7 @@ void UDPWrap::Initialize(Local<Object> target,
Local<FunctionTemplate>(),
attributes);

UDPWrapBase::AddMethods(env, t);
env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "connect", Connect);
Expand All @@ -120,8 +161,6 @@ void UDPWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "connect6", Connect6);
env->SetProtoMethod(t, "send6", Send6);
env->SetProtoMethod(t, "disconnect", Disconnect);
env->SetProtoMethod(t, "recvStart", RecvStart);
env->SetProtoMethod(t, "recvStop", RecvStop);
env->SetProtoMethod(t, "getpeername",
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
env->SetProtoMethod(t, "getsockname",
Expand Down Expand Up @@ -220,6 +259,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
flags);
}

if (err == 0)
wrap->listener()->OnAfterBind();

args.GetReturnValue().Set(err);
}

Expand Down Expand Up @@ -464,14 +506,10 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
CHECK(args[3]->IsBoolean());
}

Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
// it is faster to fetch the length of the
// array in js-land
size_t count = args[2].As<Uint32>()->Value();
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();

size_t msg_size = 0;

MaybeStackBuffer<uv_buf_t, 16> bufs(count);

Expand All @@ -483,7 +521,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
size_t length = Buffer::Length(chunk);

bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
msg_size += length;
}

int err = 0;
Expand All @@ -493,14 +530,36 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
const unsigned short port = args[3].As<Uint32>()->Value();
node::Utf8Value address(env->isolate(), args[4]);
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
if (err == 0) {
if (err == 0)
addr = reinterpret_cast<sockaddr*>(&addr_storage);
}
}

uv_buf_t* bufs_ptr = *bufs;
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
if (err == 0) {
wrap->current_send_req_wrap_ = args[0].As<Object>();
wrap->current_send_has_callback_ =
sendto ? args[5]->IsTrue() : args[3]->IsTrue();

err = wrap->Send(*bufs, count, addr);

wrap->current_send_req_wrap_.Clear();
wrap->current_send_has_callback_ = false;
}

args.GetReturnValue().Set(err);
}

ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
size_t count,
const sockaddr* addr) {
if (IsHandleClosing()) return UV_EBADF;

size_t msg_size = 0;
for (size_t i = 0; i < count; i++)
msg_size += bufs_ptr[i].len;

int err = 0;
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
if (err == UV_ENOSYS || err == UV_EAGAIN) {
err = 0;
} else if (err >= 0) {
Expand All @@ -518,28 +577,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
CHECK_EQ(static_cast<size_t>(err), msg_size);
// + 1 so that the JS side can distinguish 0-length async sends from
// 0-length sync sends.
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
return;
return msg_size + 1;
}
}
}

if (err == 0) {
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
req_wrap->msg_size = msg_size;

err = req_wrap->Dispatch(uv_udp_send,
&wrap->handle_,
bufs_ptr,
count,
addr,
OnSend);
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
if (req_wrap == nullptr) return UV_ENOSYS;

err = req_wrap->Dispatch(
uv_udp_send,
&handle_,
bufs_ptr,
count,
addr,
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
self->listener()->OnSendDone(
ReqWrap<uv_udp_send_t>::from_req(req), status);
}});
if (err)
delete req_wrap;
}

args.GetReturnValue().Set(err);
return err;
}


ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
SendWrap* req_wrap = new SendWrap(env(),
current_send_req_wrap_,
current_send_has_callback_);
req_wrap->msg_size = msg_size;
return req_wrap;
}


Expand All @@ -553,31 +625,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
}


void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
AsyncWrap* UDPWrap::GetAsyncWrap() {
return this;
}

int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
return uv_udp_getpeername(&handle_, name, namelen);
}

int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
return uv_udp_getsockname(&handle_, name, namelen);
}

void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
}

int UDPWrap::RecvStart() {
if (IsHandleClosing()) return UV_EBADF;
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
// UV_EALREADY means that the socket is already bound but that's okay
if (err == UV_EALREADY)
err = 0;
args.GetReturnValue().Set(err);
return err;
}


void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int r = uv_udp_recv_stop(&wrap->handle_);
args.GetReturnValue().Set(r);
void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
}

int UDPWrap::RecvStop() {
if (IsHandleClosing()) return UV_EBADF;
return uv_udp_recv_stop(&handle_);
}


void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
if (req_wrap->have_callback()) {
Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate());
Expand All @@ -594,43 +681,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
void UDPWrap::OnAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
*buf = wrap->env()->AllocateManaged(suggested_size).release();
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
reinterpret_cast<uv_udp_t*>(handle));
*buf = wrap->listener()->OnAlloc(suggested_size);
}

uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
return env()->AllocateManaged(suggested_size).release();
}

void UDPWrap::OnRecv(uv_udp_t* handle,
ssize_t nread,
const uv_buf_t* buf_,
const struct sockaddr* addr,
const uv_buf_t* buf,
const sockaddr* addr,
unsigned int flags) {
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
Environment* env = wrap->env();
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
wrap->listener()->OnRecv(nread, *buf, addr, flags);
}

AllocatedBuffer buf(env, *buf_);
void UDPWrap::OnRecv(ssize_t nread,
const uv_buf_t& buf_,
const sockaddr* addr,
unsigned int flags) {
Environment* env = this->env();
AllocatedBuffer buf(env, buf_);
if (nread == 0 && addr == nullptr) {
return;
}

HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

Local<Object> wrap_obj = wrap->object();
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
wrap_obj,
object(),
Undefined(env->isolate()),
Undefined(env->isolate())
};

if (nread < 0) {
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
return;
}

buf.Resize(nread);
argv[2] = buf.ToBuffer().ToLocalChecked();
argv[3] = AddressToJS(env, addr);
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
}

MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,
Expand Down
Loading

0 comments on commit 322a998

Please sign in to comment.