diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index a1bb3b5a78ccc2..9e00ad09f601f4 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -41,6 +41,8 @@ using v8::Value; using v8::String; using v8::Global; using v8::Function; +using v8::Int32; +using v8::Uint32; namespace { // ============================================================================ @@ -959,8 +961,8 @@ class StreamEntry final : public EntryBase { // Creating the callback failed for whatever reason. The error will propagate // thank to the callback scope, but let's end the reader and fail this read. ended_ = true; - std::move(next)(bob::STATUS_EOS, nullptr, 0, [](size_t) {}); - return bob::STATUS_EOS; + std::move(next)(bob::STATUS_FAILED, nullptr, 0, [](size_t) {}); + return bob::STATUS_FAILED; } Local argv[] = { callback }; @@ -969,10 +971,9 @@ class StreamEntry final : public EntryBase { if (!pull->Call(isolate->GetCurrentContext(), wrap, arraysize(argv), argv).ToLocal(&ret)) { // The call failed for whatever reason. The error will propagate thanks to the // callback scope, but let's end the reader and fail this read. - // TODO(@jasnell, @flakey5): Bob streams need a proper error status... ended_ = true; - std::move(next)(bob::STATUS_EOS, nullptr, 0, [](size_t) {}); - return bob::STATUS_EOS; + std::move(next)(bob::STATUS_FAILED, nullptr, 0, [](size_t) {}); + return bob::STATUS_FAILED; } return bob::STATUS_WAIT; @@ -1386,10 +1387,13 @@ class FdEntry final : public EntryBase { CHECK(args.IsConstructCall()); Environment* env = Environment::GetCurrent(args); - // TODO(dataqueue): Get these from the arguments - int fd = 0; - size_t start = 0; - size_t end = 0; + CHECK(args[0]->IsInt32()); + CHECK(args[1]->IsUint32()); + CHECK(args[2]->IsUint32()); + + int fd = args[0].As()->Value(); + size_t start = args[1].As()->Value(); + size_t end = args[1].As()->Value(); new Wrap(env, args.This(), fd, start, Just(end)); } diff --git a/src/node_blob.cc b/src/node_blob.cc index 1974bb015b7edb..9fdba0454fa052 100644 --- a/src/node_blob.cc +++ b/src/node_blob.cc @@ -71,7 +71,8 @@ bool Blob::HasInstance(Environment* env, v8::Local object) { return GetConstructorTemplate(env)->HasInstance(object); } -BaseObjectPtr Blob::Create(Environment* env, std::shared_ptr data_queue) { +BaseObjectPtr Blob::Create( + Environment* env, std::shared_ptr data_queue) { HandleScope scope(env->isolate()); Local ctor; @@ -90,9 +91,8 @@ void Blob::New(const FunctionCallbackInfo& args) { CHECK(args[0]->IsArray()); // sources CHECK(args[1]->IsUint32()); // length - // TODO(@flakey5): revisit when DataQueue is complete + // TODO(@flakey5): delete //std::vector entries; - //size_t length = args[1].As()->Value(); //size_t len = 0; //Local ary = args[0].As(); @@ -119,10 +119,32 @@ void Blob::New(const FunctionCallbackInfo& args) { //} //CHECK_EQ(length, len); - // TODO(@flakey5): get dataqueue from js - std::shared_ptr data_queue = nullptr; + Local array = args[0].As(); + size_t length = args[1].As()->Value(); + std::vector> entries(length); + + for (size_t i = 0; i < array->Length(); i++) { + Local entry; + if (!array->Get(env->context(), i).ToLocal(&entry)) { + return; + } + + // TODO(@flakey5): check for different entry types + CHECK(entry->IsArrayBufferView() || Blob::HasInstance(env, entry)); + if (entry->IsArrayBufferView()) { + Local view = entry.As(); + CHECK_EQ(view->ByteOffset(), 0); - BaseObjectPtr blob = Create(env, data_queue); + entries[i] = DataQueue::CreateInMemoryEntryFromView(view); + } else { + Blob* blob; + ASSIGN_OR_RETURN_UNWRAP(&blob, entry); + + entries[i] = DataQueue::CreateDataQueueEntry(blob->data_queue_); + } + } + + BaseObjectPtr blob = Create(env, DataQueue::CreateIdempotent(entries)); if (blob) args.GetReturnValue().Set(blob->object()); } @@ -150,16 +172,37 @@ void Blob::ToSlice(const FunctionCallbackInfo& args) { } void Blob::MemoryInfo(MemoryTracker* tracker) const { - tracker->TrackFieldWithSize("store", length()); + tracker->TrackField("data_queue_", data_queue_); } MaybeLocal Blob::GetArrayBuffer(Environment* env) { - // TODO(@flakey5): figure out how this'll work EscapableHandleScope scope(env->isolate()); size_t len = length(); std::shared_ptr store = ArrayBuffer::NewBackingStore(env->isolate(), len); + if (len > 0) { + std::unique_ptr reader = this->data_queue_->getReader(); + DataQueue::Vec vec; + size_t offset = 0; + while (reader->Pull( + [&store, &offset](int, + const DataQueue::Vec* vec, + size_t count, + bob::Done done) { + for (size_t i = 0; i < count; i++) { + memcpy( + static_cast(store->Data()) + offset, + vec[i].base, + vec[i].len); + offset += vec[i].len; + } + }, + bob::Options::OPTIONS_NONE, + &vec, + len) != bob::Status::STATUS_EOS); + } + /*if (len > 0) { unsigned char* dest = static_cast(store->Data()); size_t total = 0; @@ -281,7 +324,7 @@ void Blob::GetDataObject(const v8::FunctionCallbackInfo& args) { } } -// TODO(@flakey5): revisit when DataQueue is complete +// TODO(@flakey5): delete //FixedSizeBlobCopyJob::FixedSizeBlobCopyJob( // Environment* env, // Local object, @@ -471,7 +514,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Blob::GetDataObject); registry->Register(Blob::RevokeDataObject); - FixedSizeBlobCopyJob::RegisterExternalReferences(registry); + //FixedSizeBlobCopyJob::RegisterExternalReferences(registry); } } // namespace node diff --git a/src/node_blob.h b/src/node_blob.h index a81b0189e3843d..460f27d571b9a8 100644 --- a/src/node_blob.h +++ b/src/node_blob.h @@ -85,7 +85,7 @@ class Blob : public BaseObject { std::shared_ptr data_queue_; }; -// TODO(@flakey5): revisit when DataQueue is complete +// TODO(@flakey5): delete //class FixedSizeBlobCopyJob : public AsyncWrap, public ThreadPoolWork { // public: // enum class Mode { diff --git a/src/node_bob.h b/src/node_bob.h index 9c4cded3e31978..d0160b393c9b39 100644 --- a/src/node_bob.h +++ b/src/node_bob.h @@ -10,6 +10,10 @@ constexpr size_t kMaxCountHint = 16; // Negative status codes indicate error conditions. enum Status : int { + // Indicates that there was an error while pulling. + // Should be treated similar to STATUS_EOS + STATUS_FAILED = -2, + // Indicates that an attempt was made to pull after end. STATUS_EOS = -1,