Skip to content

Commit

Permalink
src: enable Writev to write beyond INT_MAX
Browse files Browse the repository at this point in the history
Vectored writes that contain large string data (accumulated from
small strings over time due to congestion in the stream) fails
with ENOBUFS if the cumulative chunk size is more than INT_MAX.

Under backpressure situations failure is justified in JS land with heap
OOM as well as in the native land with libuv resource exhaustion etc,
but the stream wrap that sits in the middle which just facilitates the
transport between layers is not.

Detect the large data situation, and split those at right boundaries.
Carry out intermediary writes through dummy write_wrap objects to avoid
multiple callbacks to the requestor.

Fixes: nodejs#24992
  • Loading branch information
gireeshpunathil committed Dec 14, 2018
1 parent 80ab537 commit 93b0188
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 19 deletions.
67 changes: 48 additions & 19 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,11 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
else
count = chunks->Length() >> 1;

MaybeStackBuffer<uv_buf_t, 16> bufs(count);

size_t storage_size = 0;
size_t offset;

if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
size_t index = 0;
for (size_t i = index; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();

if (Buffer::HasInstance(chunk))
Expand All @@ -101,31 +98,57 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
.To(&chunk_size))
return 0;
storage_size += chunk_size;
if (storage_size >= INT_MAX) {
Local<Object> temp = env->write_wrap_template()
->NewInstance(env->context())
.ToLocalChecked();
StreamReq::ResetObject(temp);

int err = WritevHelper(storage_size - chunk_size,
index,
i - 1,
all_buffers,
env,
chunks,
temp);
if (err != 0) {
return err;
}
index = i - 1;
storage_size = 0;
}
}

if (storage_size > INT_MAX)
return UV_ENOBUFS;
StreamReq::ResetObject(req_wrap_obj);
return WritevHelper(
storage_size, index, count, all_buffers, env, chunks, req_wrap_obj);
} else {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
}
return WritevHelper(
storage_size, 0, count, all_buffers, env, chunks, req_wrap_obj);
}
}

int StreamBase::WritevHelper(size_t storage_size,
int index,
int count,
bool all_buffers,
Environment* env,
const Local<Array>& chunks,
const Local<Object>& req_wrap_obj) {
MaybeStackBuffer<uv_buf_t, 16> bufs(count - index);
size_t offset;
MallocedBuffer<char> storage;
if (storage_size > 0)
storage = MallocedBuffer<char>(storage_size);

offset = 0;
if (!all_buffers) {
for (size_t i = 0; i < count; i++) {
for (size_t i = index; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();

// Write buffer
if (Buffer::HasInstance(chunk)) {
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bufs[i - index].base = Buffer::Data(chunk);
bufs[i - index].len = Buffer::Length(chunk);
continue;
}

Expand All @@ -142,13 +165,19 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
str_size,
string,
encoding);
bufs[i].base = str_storage;
bufs[i].len = str_size;
bufs[i - index].base = str_storage;
bufs[i - index].len = str_size;
offset += str_size;
}
} else {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
}
}

StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
StreamWriteResult res = Write(*bufs, count - index, nullptr, req_wrap_obj);
SetWriteResult(res);
if (res.wrap != nullptr && storage_size > 0) {
res.wrap->SetAllocatedStorage(storage.release(), storage_size);
Expand Down
7 changes: 7 additions & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,13 @@ class StreamBase : public StreamResource {
friend class WriteWrap;
friend class ShutdownWrap;
friend class Environment; // For kNumStreamBaseStateFields.
int WritevHelper(size_t storage_size,
int index,
int count,
bool all_buffers,
Environment* env,
const v8::Local<v8::Array>& chunks,
const v8::Local<v8::Object>& req_wrap_obj);
};


Expand Down

0 comments on commit 93b0188

Please sign in to comment.