From afef8f26145a3c74e43e6731c1336ccbc7f16c90 Mon Sep 17 00:00:00 2001 From: Robert Pang Date: Wed, 12 Sep 2018 15:13:38 -0700 Subject: [PATCH] Fixes #469: Optimize QLResultSet to serialize rows results directly Summary: Optimize QLResultSet by avoiding memory allocation and copying of rows by serializing the rows directly. This improves CassandraPersonalization read workload which does range scan by 10%. Test Plan: Load data: $ java -jar ~/code/yugabyte/java/yb-loadtester/target/yb-sample-apps.jar -workload CassandraPersonalization -num_threads_read 0 -num_threads_write 1 -num_new_coupons_per_customer 3000 -nodes 127.0.0.1:9042 -nouuid -num_unique_keys 300 Read test: $ java -jar ~/code/yugabyte/java/yb-loadtester/target/yb-sample-apps.jar -workload CassandraPersonalization -nodes 127.0.0.1:9042 -nouuid -max_written_key 300 -read_only -num_threads_write 0 -num_threads_read 1 Before: ``` 2018-09-12 11:22:56,052 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 42.17 ops/sec (23.69 ms/op), 2129 total ops | Write: 0.00 ops/sec (0.00 ms/op), 0 total ops | Uptime: 50030 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 | 2018-09-12 11:23:01,056 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 41.96 ops/sec (23.79 ms/op), 2339 total ops | Write: 0.00 ops/sec (0.00 ms/op), 0 total ops | Uptime: 55034 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 | 2018-09-12 11:23:06,059 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 42.78 ops/sec (23.40 ms/op), 2553 total ops | Write: 0.00 ops/sec (0.00 ms/op), 0 total ops | Uptime: 60037 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 | ``` After: ``` 2018-09-12 11:24:28,430 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 46.56 ops/sec (21.50 ms/op), 2310 total ops | Write: 0.00 ops/sec (0.00 ms/op), 0 total ops | Uptime: 50026 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 | 2018-09-12 11:24:33,432 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 46.37 ops/sec (21.53 ms/op), 2542 total ops | Write: 0.00 ops/sec (0.00 ms/op), 0 total ops | Uptime: 55028 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 | 2018-09-12 11:24:38,435 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 46.57 ops/sec (21.49 ms/op), 2775 total ops | Write: 0.00 ops/sec (0.00 ms/op), 0 total ops | Uptime: 60031 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 | ``` Reviewers: mihnea Reviewed By: mihnea Subscribers: yql Differential Revision: https://phabricator.dev.yugabyte.com/D5465 --- src/yb/common/ql_resultset.cc | 46 ++++++-------------- src/yb/common/ql_resultset.h | 67 ++++++++---------------------- src/yb/common/ql_rowblock.cc | 10 ++--- src/yb/common/wire_protocol.h | 20 +++++++-- src/yb/docdb/doc_operation-test.cc | 14 +++---- src/yb/docdb/doc_operation.cc | 12 +++--- src/yb/tablet/abstract_tablet.cc | 12 +----- 7 files changed, 65 insertions(+), 116 deletions(-) diff --git a/src/yb/common/ql_resultset.cc b/src/yb/common/ql_resultset.cc index 9277417e4c85..4a3dd07d358c 100644 --- a/src/yb/common/ql_resultset.cc +++ b/src/yb/common/ql_resultset.cc @@ -14,9 +14,8 @@ namespace yb { QLRSRowDesc::QLRSRowDesc(const QLRSRowDescPB& desc_pb) { rscol_descs_.reserve(desc_pb.rscol_descs().size()); - for (auto rscol_desc_pb : desc_pb.rscol_descs()) { - rscol_descs_.emplace_back(rscol_desc_pb.name(), - QLType::FromQLTypePB(rscol_desc_pb.ql_type())); + for (const auto& rscol_desc_pb : desc_pb.rscol_descs()) { + rscol_descs_.emplace_back(rscol_desc_pb); } } @@ -25,45 +24,24 @@ QLRSRowDesc::~QLRSRowDesc() { //-------------------------------------------------------------------------------------------------- -QLRSRow::QLRSRow(int32_t rscol_count) : rscols_(rscol_count) { +QLResultSet::QLResultSet(const QLRSRowDesc* rsrow_desc, faststring* rows_data) + : rsrow_desc_(rsrow_desc), rows_data_(rows_data) { + CQLEncodeLength(0, rows_data_); } -QLRSRow::~QLRSRow() { -} - -CHECKED_STATUS QLRSRow::CQLSerialize(const QLClient& client, - const QLRSRowDesc& rsrow_desc, - faststring* buffer) const { - DCHECK_EQ(rsrow_desc.rscol_count(), rscol_count()) << "Wrong count of fields in result set"; - int idx = 0; - for (auto rscol_desc : rsrow_desc.rscol_descs()) { - rscols_[idx].Serialize(rscol_desc.ql_type(), client, buffer); - idx++; - } - return Status::OK(); -} - -//-------------------------------------------------------------------------------------------------- - -QLResultSet::QLResultSet() { +QLResultSet::~QLResultSet() { } -QLResultSet::~QLResultSet() { +void QLResultSet::AllocateRow() { + CQLEncodeLength(CQLDecodeLength(rows_data_->data()) + 1, rows_data_->data()); } -QLRSRow *QLResultSet::AllocateRSRow(int32_t rscol_count) { - rsrows_.emplace_back(rscol_count); - return &rsrows_.back(); +void QLResultSet::AppendColumn(const size_t index, const QLValue& value) { + value.Serialize(rsrow_desc_->rscol_descs()[index].ql_type(), YQL_CLIENT_CQL, rows_data_); } -CHECKED_STATUS QLResultSet::CQLSerialize(const QLClient& client, - const QLRSRowDesc& rsrow_desc, - faststring* buffer) const { - CQLEncodeLength(rsrows_.size(), buffer); - for (const auto& rsrow : rsrows_) { - RETURN_NOT_OK(rsrow.CQLSerialize(client, rsrow_desc, buffer)); - } - return Status::OK(); +size_t QLResultSet::rsrow_count() const { + return CQLDecodeLength(rows_data_->data()); } } // namespace yb diff --git a/src/yb/common/ql_resultset.h b/src/yb/common/ql_resultset.h index 1be6de729d9e..efa71fb52db3 100644 --- a/src/yb/common/ql_resultset.h +++ b/src/yb/common/ql_resultset.h @@ -2,9 +2,7 @@ // Copyright (c) YugaByte, Inc. // // This module defines the ResultSet that QL database returns to a query request. -// ResultSet is a set of rows of data that is returned by a query request, and each of selected row -// is name "rsrow" in our code. We don't call "rsrow" tuple to avoid conflict with TUPLE datatype -// in Apache Cassandra. +// QLResultSet is a set of rows of data that is returned by a query request. // - Within our code, we call it "rsrow" instead of row to distinguish between selected-rows and // rows of a table in the database. // - Similarly, we use "rscol" in place of "column". @@ -13,6 +11,9 @@ // NOTE: // - This should be merged or shared a super class with ql_rowblock.cc. // - This will be done in the next diff. We don't do this now to avoid large code modifications. +// - For optimization, columns and rows are serialized (in CQL wire format) directly for return to +// call. If there is a need to manipulate the rows before return, QLResultSet should be changed to +// an interface with multiple implementations for different use-cases. //-------------------------------------------------------------------------------------------------- #ifndef YB_COMMON_QL_RESULTSET_H_ @@ -28,19 +29,17 @@ class QLRSRowDesc { public: class RSColDesc { public: - RSColDesc() { + explicit RSColDesc(const QLRSColDescPB& desc_pb) + : name_(desc_pb.name()), ql_type_(QLType::FromQLTypePB(desc_pb.ql_type())) { } - RSColDesc(const string& name, const QLType::SharedPtr& ql_type) - : name_(name), ql_type_(ql_type) { - } - const string& name() { + const std::string& name() const { return name_; } - const QLType::SharedPtr& ql_type() { + const QLType::SharedPtr& ql_type() const { return ql_type_; } private: - string name_; + std::string name_; QLType::SharedPtr ql_type_; }; @@ -60,57 +59,27 @@ class QLRSRowDesc { }; //-------------------------------------------------------------------------------------------------- -// A rsrow represents the values of a row in the resultset. -class QLRSRow { - public: - explicit QLRSRow(int32_t rscol_count); - QLRSRow(QLRSRow&& other) : rscols_(std::move(other.rscols_)) { } - virtual ~QLRSRow(); - - const std::vector& rscols() const { - return rscols_; - } - - size_t rscol_count() const { return rscols_.size(); } - - QLValue *rscol(int32_t index) { - return &rscols_[index]; - } - - CHECKED_STATUS CQLSerialize(const QLClient& client, - const QLRSRowDesc& rsrow_desc, - faststring* buffer) const; - - private: - std::vector rscols_; -}; - -// A set of rsrows. +// A set of rows. class QLResultSet { public: typedef std::shared_ptr SharedPtr; // Constructor and destructor. - QLResultSet(); + QLResultSet(const QLRSRowDesc* rsrow_desc, faststring* rows_data); virtual ~QLResultSet(); - const std::vector& rsrows() const { - return rsrows_; - } + // Allocate a new row at the end of result set. + void AllocateRow(); - // Allocate a new rsrow and append it to the end of result set. - QLRSRow *AllocateRSRow(int32_t rscol_count); + // Append a column to the last row in the result set. + void AppendColumn(size_t index, const QLValue& value); // Row count - size_t rsrow_count() const { return rsrows_.size(); } - - // Serialization routines with CQL encoding format. - CHECKED_STATUS CQLSerialize(const QLClient& client, - const QLRSRowDesc& rsrow_desc, - faststring* buffer) const; + size_t rsrow_count() const; private: - std::vector rsrows_; + const QLRSRowDesc* rsrow_desc_ = nullptr; + faststring* rows_data_ = nullptr; }; } // namespace yb diff --git a/src/yb/common/ql_rowblock.cc b/src/yb/common/ql_rowblock.cc index 77149c76eb58..4c7c93b465a3 100644 --- a/src/yb/common/ql_rowblock.cc +++ b/src/yb/common/ql_rowblock.cc @@ -134,21 +134,17 @@ Status QLRowBlock::Deserialize(const QLClient client, Slice* data) { Result QLRowBlock::GetRowCount(const QLClient client, const string& data) { CHECK_EQ(client, YQL_CLIENT_CQL); - int32_t cnt = 0; Slice slice(data); - RETURN_NOT_OK(CQLDecodeNum(sizeof(cnt), NetworkByteOrder::Load32, &slice, &cnt)); - return cnt; + return VERIFY_RESULT(CQLDecodeLength(&slice)); } Status QLRowBlock::AppendRowsData(const QLClient client, const string& src, string* dst) { CHECK_EQ(client, YQL_CLIENT_CQL); - int32_t src_cnt = 0; Slice src_slice(src); - RETURN_NOT_OK(CQLDecodeNum(sizeof(src_cnt), NetworkByteOrder::Load32, &src_slice, &src_cnt)); + const int32_t src_cnt = VERIFY_RESULT(CQLDecodeLength(&src_slice)); if (src_cnt > 0) { - int32_t dst_cnt = 0; Slice dst_slice(*dst); - RETURN_NOT_OK(CQLDecodeNum(sizeof(dst_cnt), NetworkByteOrder::Load32, &dst_slice, &dst_cnt)); + int32_t dst_cnt = VERIFY_RESULT(CQLDecodeLength(&dst_slice)); if (dst_cnt == 0) { *dst = src; } else { diff --git a/src/yb/common/wire_protocol.h b/src/yb/common/wire_protocol.h index d87c38aeb9c6..827be98f960c 100644 --- a/src/yb/common/wire_protocol.h +++ b/src/yb/common/wire_protocol.h @@ -153,7 +153,8 @@ static inline void CQLEncodeLength(const int32_t length, faststring* buffer) { buffer->append(&byte_value, sizeof(byte_value)); } -// Encode a 32-bit length into the buffer. Caller should ensure the buffer size is at least 4 bytes. +// Encode a 32-bit length into the buffer without extending the buffer. Caller should ensure the +// buffer size is at least 4 bytes. static inline void CQLEncodeLength(const int32_t length, void* buffer) { NetworkByteOrder::Store32(buffer, static_cast(length)); } @@ -224,13 +225,26 @@ static inline void CQLFinishCollection(int32_t start_pos, faststring* buffer) { } \ } while (0) +static inline Result CQLDecodeLength(Slice* data) { + RETURN_NOT_ENOUGH(data, sizeof(int32_t)); + const auto len = static_cast(NetworkByteOrder::Load32(data->data())); + data->remove_prefix(sizeof(int32_t)); + return len; +} + +// Decode a 32-bit length from the buffer without consuming the buffer. Caller should ensure the +// buffer size is at least 4 bytes. +static inline int32_t CQLDecodeLength(const void* buffer) { + return static_cast(NetworkByteOrder::Load32(buffer)); +} + // Decode a CQL number (8, 16, 32 and 64-bit integer). is the parsed integer type. // converts the number from network byte-order to machine order and // is the coverter's return type. The converter's return type is unsigned while // may be signed or unsigned. template static inline CHECKED_STATUS CQLDecodeNum( - size_t len, data_type (*converter)(const void*), Slice* data, num_type* val) { + const size_t len, data_type (*converter)(const void*), Slice* data, num_type* val) { static_assert(sizeof(data_type) == sizeof(num_type), "inconsistent num type size"); if (len != sizeof(num_type)) { @@ -250,7 +264,7 @@ static inline CHECKED_STATUS CQLDecodeNum( // is the coverter's return type. The converter's return type is an integer type. template static inline CHECKED_STATUS CQLDecodeFloat( - size_t len, data_type (*converter)(const void*), Slice* data, float_type* val) { + const size_t len, data_type (*converter)(const void*), Slice* data, float_type* val) { // Make sure float and double are exactly sizeof uint32_t and uint64_t. static_assert(sizeof(float_type) == sizeof(data_type), "inconsistent floating point type size"); data_type bval = 0; diff --git a/src/yb/docdb/doc_operation-test.cc b/src/yb/docdb/doc_operation-test.cc index 5f05407ec3d9..4798b3728e4f 100644 --- a/src/yb/docdb/doc_operation-test.cc +++ b/src/yb/docdb/doc_operation-test.cc @@ -205,21 +205,23 @@ SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ w: 2 }]) -> 4 QLRowBlock row_block(schema, vector ({ColumnId(0), ColumnId(1), ColumnId(2), ColumnId(3)})); const Schema& query_schema = row_block.schema(); - QLRSRowDescPB *rsrow_desc = ql_read_req.mutable_rsrow_desc(); + QLRSRowDescPB *rsrow_desc_pb = ql_read_req.mutable_rsrow_desc(); for (int32_t i = 0; i <= 3 ; i++) { ql_read_req.add_selected_exprs()->set_column_id(i); ql_read_req.mutable_column_refs()->add_ids(i); auto col = query_schema.column_by_id(ColumnId(i)); EXPECT_OK(col); - QLRSColDescPB *rscol_desc = rsrow_desc->add_rscol_descs(); + QLRSColDescPB *rscol_desc = rsrow_desc_pb->add_rscol_descs(); rscol_desc->set_name(col->name()); col->type()->ToQLTypePB(rscol_desc->mutable_ql_type()); } QLReadOperation read_op(ql_read_req, kNonTransactionalOperationContext); QLRocksDBStorage ql_storage(doc_db()); - QLResultSet resultset; + const QLRSRowDesc rsrow_desc(*rsrow_desc_pb); + faststring rows_data; + QLResultSet resultset(&rsrow_desc, &rows_data); HybridTime read_restart_ht; EXPECT_OK(read_op.Execute( ql_storage, MonoTime::Max() /* deadline */, ReadHybridTime::SingleTime(read_time), @@ -227,10 +229,8 @@ SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ w: 2 }]) -> 4 EXPECT_FALSE(read_restart_ht.is_valid()); // Transfer the column values from result set to rowblock. - for (const auto& rsrow : resultset.rsrows()) { - QLRow& row = row_block.Extend(); - row.SetColumnValues(rsrow.rscols()); - } + Slice data(rows_data.data(), rows_data.size()); + EXPECT_OK(row_block.Deserialize(YQL_CLIENT_CQL, &data)); return row_block; } }; diff --git a/src/yb/docdb/doc_operation.cc b/src/yb/docdb/doc_operation.cc index 2b71a53ffc38..c170d5deff16 100644 --- a/src/yb/docdb/doc_operation.cc +++ b/src/yb/docdb/doc_operation.cc @@ -2912,12 +2912,12 @@ Status QLReadOperation::Execute(const common::YQLStorageIf& ql_storage, CHECKED_STATUS QLReadOperation::PopulateResultSet(const QLTableRow& table_row, QLResultSet *resultset) { - int column_count = request_.selected_exprs().size(); - QLRSRow *rsrow = resultset->AllocateRSRow(column_count); - + resultset->AllocateRow(); int rscol_index = 0; for (const QLExpressionPB& expr : request_.selected_exprs()) { - RETURN_NOT_OK(EvalExpr(expr, table_row, rsrow->rscol(rscol_index))); + QLValue value; + RETURN_NOT_OK(EvalExpr(expr, table_row, &value)); + resultset->AppendColumn(rscol_index, value); rscol_index++; } @@ -2940,10 +2940,10 @@ CHECKED_STATUS QLReadOperation::EvalAggregate(const QLTableRow& table_row) { CHECKED_STATUS QLReadOperation::PopulateAggregate(const QLTableRow& table_row, QLResultSet *resultset) { + resultset->AllocateRow(); int column_count = request_.selected_exprs().size(); - QLRSRow *rsrow = resultset->AllocateRSRow(column_count); for (int rscol_index = 0; rscol_index < column_count; rscol_index++) { - *rsrow->rscol(rscol_index) = aggr_result_[rscol_index]; + resultset->AppendColumn(rscol_index, aggr_result_[rscol_index]); } return Status::OK(); } diff --git a/src/yb/tablet/abstract_tablet.cc b/src/yb/tablet/abstract_tablet.cc index 2ae0b6600440..2c1f480081b6 100644 --- a/src/yb/tablet/abstract_tablet.cc +++ b/src/yb/tablet/abstract_tablet.cc @@ -43,8 +43,8 @@ CHECKED_STATUS AbstractTablet::HandleQLReadRequest( } RETURN_NOT_OK(schema.CreateProjectionByIdsIgnoreMissing(column_refs, &query_schema)); - QLRSRowDesc rsrow_desc(ql_read_request.rsrow_desc()); - QLResultSet resultset; + const QLRSRowDesc rsrow_desc(ql_read_request.rsrow_desc()); + QLResultSet resultset(&rsrow_desc, &result->rows_data); TRACE("Start Execute"); const Status s = doc_op.Execute( QLStorage(), deadline, read_time, schema, query_schema, &resultset, &result->restart_read_ht); @@ -63,15 +63,7 @@ CHECKED_STATUS AbstractTablet::HandleQLReadRequest( RETURN_NOT_OK(CreatePagingStateForRead( ql_read_request, resultset.rsrow_count(), &result->response)); - // TODO(neil) The clients' request should indicate what encoding method should be used. When - // multi-shard is used to process more complicated queries, proxy-server might prefer a different - // encoding. For now, we'll call CQLSerialize() without checking encoding method. result->response.set_status(QLResponsePB::YQL_STATUS_OK); - TRACE("Start Serialize"); - RETURN_NOT_OK(resultset.CQLSerialize(ql_read_request.client(), - rsrow_desc, - &result->rows_data)); - TRACE("Done Serialize"); return Status::OK(); }