Skip to content

Commit

Permalink
Fixes #469: Optimize QLResultSet to serialize rows results directly
Browse files Browse the repository at this point in the history
Summary: Optimize QLResultSet by avoiding memory allocation and copying of rows by serializing the rows directly. This improves CassandraPersonalization read workload which does range scan by 10%.

Test Plan:
Load data:
$ java -jar ~/code/yugabyte/java/yb-loadtester/target/yb-sample-apps.jar -workload CassandraPersonalization -num_threads_read 0 -num_threads_write 1 -num_new_coupons_per_customer 3000 -nodes 127.0.0.1:9042 -nouuid -num_unique_keys 300

Read test:
$ java -jar ~/code/yugabyte/java/yb-loadtester/target/yb-sample-apps.jar -workload CassandraPersonalization -nodes 127.0.0.1:9042 -nouuid -max_written_key 300 -read_only -num_threads_write 0 -num_threads_read 1

Before:

```
2018-09-12 11:22:56,052 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 42.17 ops/sec (23.69 ms/op), 2129 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 50030 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 |
2018-09-12 11:23:01,056 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 41.96 ops/sec (23.79 ms/op), 2339 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 55034 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 |
2018-09-12 11:23:06,059 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 42.78 ops/sec (23.40 ms/op), 2553 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 60037 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 |
```

After:

```
2018-09-12 11:24:28,430 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 46.56 ops/sec (21.50 ms/op), 2310 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 50026 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 |
2018-09-12 11:24:33,432 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 46.37 ops/sec (21.53 ms/op), 2542 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 55028 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 |
2018-09-12 11:24:38,435 [INFO|com.yugabyte.sample.common.metrics.MetricsTracker|MetricsTracker] Read: 46.57 ops/sec (21.49 ms/op), 2775 total ops  |  Write: 0.00 ops/sec (0.00 ms/op), 0 total ops  |  Uptime: 60031 ms | maxWrittenKey: 300 | maxGeneratedKey: 300 |
```

Reviewers: mihnea

Reviewed By: mihnea

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D5465
  • Loading branch information
robertpang committed Sep 12, 2018
1 parent 3735763 commit afef8f2
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 116 deletions.
46 changes: 12 additions & 34 deletions src/yb/common/ql_resultset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ namespace yb {

QLRSRowDesc::QLRSRowDesc(const QLRSRowDescPB& desc_pb) {
rscol_descs_.reserve(desc_pb.rscol_descs().size());
for (auto rscol_desc_pb : desc_pb.rscol_descs()) {
rscol_descs_.emplace_back(rscol_desc_pb.name(),
QLType::FromQLTypePB(rscol_desc_pb.ql_type()));
for (const auto& rscol_desc_pb : desc_pb.rscol_descs()) {
rscol_descs_.emplace_back(rscol_desc_pb);
}
}

Expand All @@ -25,45 +24,24 @@ QLRSRowDesc::~QLRSRowDesc() {

//--------------------------------------------------------------------------------------------------

QLRSRow::QLRSRow(int32_t rscol_count) : rscols_(rscol_count) {
QLResultSet::QLResultSet(const QLRSRowDesc* rsrow_desc, faststring* rows_data)
: rsrow_desc_(rsrow_desc), rows_data_(rows_data) {
CQLEncodeLength(0, rows_data_);
}

QLRSRow::~QLRSRow() {
}

CHECKED_STATUS QLRSRow::CQLSerialize(const QLClient& client,
const QLRSRowDesc& rsrow_desc,
faststring* buffer) const {
DCHECK_EQ(rsrow_desc.rscol_count(), rscol_count()) << "Wrong count of fields in result set";
int idx = 0;
for (auto rscol_desc : rsrow_desc.rscol_descs()) {
rscols_[idx].Serialize(rscol_desc.ql_type(), client, buffer);
idx++;
}
return Status::OK();
}

//--------------------------------------------------------------------------------------------------

QLResultSet::QLResultSet() {
QLResultSet::~QLResultSet() {
}

QLResultSet::~QLResultSet() {
void QLResultSet::AllocateRow() {
CQLEncodeLength(CQLDecodeLength(rows_data_->data()) + 1, rows_data_->data());
}

QLRSRow *QLResultSet::AllocateRSRow(int32_t rscol_count) {
rsrows_.emplace_back(rscol_count);
return &rsrows_.back();
void QLResultSet::AppendColumn(const size_t index, const QLValue& value) {
value.Serialize(rsrow_desc_->rscol_descs()[index].ql_type(), YQL_CLIENT_CQL, rows_data_);
}

CHECKED_STATUS QLResultSet::CQLSerialize(const QLClient& client,
const QLRSRowDesc& rsrow_desc,
faststring* buffer) const {
CQLEncodeLength(rsrows_.size(), buffer);
for (const auto& rsrow : rsrows_) {
RETURN_NOT_OK(rsrow.CQLSerialize(client, rsrow_desc, buffer));
}
return Status::OK();
size_t QLResultSet::rsrow_count() const {
return CQLDecodeLength(rows_data_->data());
}

} // namespace yb
67 changes: 18 additions & 49 deletions src/yb/common/ql_resultset.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
// Copyright (c) YugaByte, Inc.
//
// This module defines the ResultSet that QL database returns to a query request.
// ResultSet is a set of rows of data that is returned by a query request, and each of selected row
// is name "rsrow" in our code. We don't call "rsrow" tuple to avoid conflict with TUPLE datatype
// in Apache Cassandra.
// QLResultSet is a set of rows of data that is returned by a query request.
// - Within our code, we call it "rsrow" instead of row to distinguish between selected-rows and
// rows of a table in the database.
// - Similarly, we use "rscol" in place of "column".
Expand All @@ -13,6 +11,9 @@
// NOTE:
// - This should be merged or shared a super class with ql_rowblock.cc.
// - This will be done in the next diff. We don't do this now to avoid large code modifications.
// - For optimization, columns and rows are serialized (in CQL wire format) directly for return to
// call. If there is a need to manipulate the rows before return, QLResultSet should be changed to
// an interface with multiple implementations for different use-cases.
//--------------------------------------------------------------------------------------------------

#ifndef YB_COMMON_QL_RESULTSET_H_
Expand All @@ -28,19 +29,17 @@ class QLRSRowDesc {
public:
class RSColDesc {
public:
RSColDesc() {
explicit RSColDesc(const QLRSColDescPB& desc_pb)
: name_(desc_pb.name()), ql_type_(QLType::FromQLTypePB(desc_pb.ql_type())) {
}
RSColDesc(const string& name, const QLType::SharedPtr& ql_type)
: name_(name), ql_type_(ql_type) {
}
const string& name() {
const std::string& name() const {
return name_;
}
const QLType::SharedPtr& ql_type() {
const QLType::SharedPtr& ql_type() const {
return ql_type_;
}
private:
string name_;
std::string name_;
QLType::SharedPtr ql_type_;
};

Expand All @@ -60,57 +59,27 @@ class QLRSRowDesc {
};

//--------------------------------------------------------------------------------------------------
// A rsrow represents the values of a row in the resultset.
class QLRSRow {
public:
explicit QLRSRow(int32_t rscol_count);
QLRSRow(QLRSRow&& other) : rscols_(std::move(other.rscols_)) { }
virtual ~QLRSRow();

const std::vector<QLValue>& rscols() const {
return rscols_;
}

size_t rscol_count() const { return rscols_.size(); }

QLValue *rscol(int32_t index) {
return &rscols_[index];
}

CHECKED_STATUS CQLSerialize(const QLClient& client,
const QLRSRowDesc& rsrow_desc,
faststring* buffer) const;

private:
std::vector<QLValue> rscols_;
};

// A set of rsrows.
// A set of rows.
class QLResultSet {
public:
typedef std::shared_ptr<QLResultSet> SharedPtr;

// Constructor and destructor.
QLResultSet();
QLResultSet(const QLRSRowDesc* rsrow_desc, faststring* rows_data);
virtual ~QLResultSet();

const std::vector<QLRSRow>& rsrows() const {
return rsrows_;
}
// Allocate a new row at the end of result set.
void AllocateRow();

// Allocate a new rsrow and append it to the end of result set.
QLRSRow *AllocateRSRow(int32_t rscol_count);
// Append a column to the last row in the result set.
void AppendColumn(size_t index, const QLValue& value);

// Row count
size_t rsrow_count() const { return rsrows_.size(); }

// Serialization routines with CQL encoding format.
CHECKED_STATUS CQLSerialize(const QLClient& client,
const QLRSRowDesc& rsrow_desc,
faststring* buffer) const;
size_t rsrow_count() const;

private:
std::vector<QLRSRow> rsrows_;
const QLRSRowDesc* rsrow_desc_ = nullptr;
faststring* rows_data_ = nullptr;
};

} // namespace yb
Expand Down
10 changes: 3 additions & 7 deletions src/yb/common/ql_rowblock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,17 @@ Status QLRowBlock::Deserialize(const QLClient client, Slice* data) {

Result<size_t> QLRowBlock::GetRowCount(const QLClient client, const string& data) {
CHECK_EQ(client, YQL_CLIENT_CQL);
int32_t cnt = 0;
Slice slice(data);
RETURN_NOT_OK(CQLDecodeNum(sizeof(cnt), NetworkByteOrder::Load32, &slice, &cnt));
return cnt;
return VERIFY_RESULT(CQLDecodeLength(&slice));
}

Status QLRowBlock::AppendRowsData(const QLClient client, const string& src, string* dst) {
CHECK_EQ(client, YQL_CLIENT_CQL);
int32_t src_cnt = 0;
Slice src_slice(src);
RETURN_NOT_OK(CQLDecodeNum(sizeof(src_cnt), NetworkByteOrder::Load32, &src_slice, &src_cnt));
const int32_t src_cnt = VERIFY_RESULT(CQLDecodeLength(&src_slice));
if (src_cnt > 0) {
int32_t dst_cnt = 0;
Slice dst_slice(*dst);
RETURN_NOT_OK(CQLDecodeNum(sizeof(dst_cnt), NetworkByteOrder::Load32, &dst_slice, &dst_cnt));
int32_t dst_cnt = VERIFY_RESULT(CQLDecodeLength(&dst_slice));
if (dst_cnt == 0) {
*dst = src;
} else {
Expand Down
20 changes: 17 additions & 3 deletions src/yb/common/wire_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ static inline void CQLEncodeLength(const int32_t length, faststring* buffer) {
buffer->append(&byte_value, sizeof(byte_value));
}

// Encode a 32-bit length into the buffer. Caller should ensure the buffer size is at least 4 bytes.
// Encode a 32-bit length into the buffer without extending the buffer. Caller should ensure the
// buffer size is at least 4 bytes.
static inline void CQLEncodeLength(const int32_t length, void* buffer) {
NetworkByteOrder::Store32(buffer, static_cast<uint32_t>(length));
}
Expand Down Expand Up @@ -224,13 +225,26 @@ static inline void CQLFinishCollection(int32_t start_pos, faststring* buffer) {
} \
} while (0)

static inline Result<int32_t> CQLDecodeLength(Slice* data) {
RETURN_NOT_ENOUGH(data, sizeof(int32_t));
const auto len = static_cast<int32_t>(NetworkByteOrder::Load32(data->data()));
data->remove_prefix(sizeof(int32_t));
return len;
}

// Decode a 32-bit length from the buffer without consuming the buffer. Caller should ensure the
// buffer size is at least 4 bytes.
static inline int32_t CQLDecodeLength(const void* buffer) {
return static_cast<int32_t>(NetworkByteOrder::Load32(buffer));
}

// Decode a CQL number (8, 16, 32 and 64-bit integer). <num_type> is the parsed integer type.
// <converter> converts the number from network byte-order to machine order and <data_type>
// is the coverter's return type. The converter's return type <data_type> is unsigned while
// <num_type> may be signed or unsigned.
template<typename num_type, typename data_type>
static inline CHECKED_STATUS CQLDecodeNum(
size_t len, data_type (*converter)(const void*), Slice* data, num_type* val) {
const size_t len, data_type (*converter)(const void*), Slice* data, num_type* val) {

static_assert(sizeof(data_type) == sizeof(num_type), "inconsistent num type size");
if (len != sizeof(num_type)) {
Expand All @@ -250,7 +264,7 @@ static inline CHECKED_STATUS CQLDecodeNum(
// is the coverter's return type. The converter's return type <data_type> is an integer type.
template<typename float_type, typename data_type>
static inline CHECKED_STATUS CQLDecodeFloat(
size_t len, data_type (*converter)(const void*), Slice* data, float_type* val) {
const size_t len, data_type (*converter)(const void*), Slice* data, float_type* val) {
// Make sure float and double are exactly sizeof uint32_t and uint64_t.
static_assert(sizeof(float_type) == sizeof(data_type), "inconsistent floating point type size");
data_type bval = 0;
Expand Down
14 changes: 7 additions & 7 deletions src/yb/docdb/doc_operation-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,32 +205,32 @@ SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 2 }]) -> 4
QLRowBlock row_block(schema, vector<ColumnId> ({ColumnId(0), ColumnId(1), ColumnId(2),
ColumnId(3)}));
const Schema& query_schema = row_block.schema();
QLRSRowDescPB *rsrow_desc = ql_read_req.mutable_rsrow_desc();
QLRSRowDescPB *rsrow_desc_pb = ql_read_req.mutable_rsrow_desc();
for (int32_t i = 0; i <= 3 ; i++) {
ql_read_req.add_selected_exprs()->set_column_id(i);
ql_read_req.mutable_column_refs()->add_ids(i);

auto col = query_schema.column_by_id(ColumnId(i));
EXPECT_OK(col);
QLRSColDescPB *rscol_desc = rsrow_desc->add_rscol_descs();
QLRSColDescPB *rscol_desc = rsrow_desc_pb->add_rscol_descs();
rscol_desc->set_name(col->name());
col->type()->ToQLTypePB(rscol_desc->mutable_ql_type());
}

QLReadOperation read_op(ql_read_req, kNonTransactionalOperationContext);
QLRocksDBStorage ql_storage(doc_db());
QLResultSet resultset;
const QLRSRowDesc rsrow_desc(*rsrow_desc_pb);
faststring rows_data;
QLResultSet resultset(&rsrow_desc, &rows_data);
HybridTime read_restart_ht;
EXPECT_OK(read_op.Execute(
ql_storage, MonoTime::Max() /* deadline */, ReadHybridTime::SingleTime(read_time),
schema, query_schema, &resultset, &read_restart_ht));
EXPECT_FALSE(read_restart_ht.is_valid());

// Transfer the column values from result set to rowblock.
for (const auto& rsrow : resultset.rsrows()) {
QLRow& row = row_block.Extend();
row.SetColumnValues(rsrow.rscols());
}
Slice data(rows_data.data(), rows_data.size());
EXPECT_OK(row_block.Deserialize(YQL_CLIENT_CQL, &data));
return row_block;
}
};
Expand Down
12 changes: 6 additions & 6 deletions src/yb/docdb/doc_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2912,12 +2912,12 @@ Status QLReadOperation::Execute(const common::YQLStorageIf& ql_storage,

CHECKED_STATUS QLReadOperation::PopulateResultSet(const QLTableRow& table_row,
QLResultSet *resultset) {
int column_count = request_.selected_exprs().size();
QLRSRow *rsrow = resultset->AllocateRSRow(column_count);

resultset->AllocateRow();
int rscol_index = 0;
for (const QLExpressionPB& expr : request_.selected_exprs()) {
RETURN_NOT_OK(EvalExpr(expr, table_row, rsrow->rscol(rscol_index)));
QLValue value;
RETURN_NOT_OK(EvalExpr(expr, table_row, &value));
resultset->AppendColumn(rscol_index, value);
rscol_index++;
}

Expand All @@ -2940,10 +2940,10 @@ CHECKED_STATUS QLReadOperation::EvalAggregate(const QLTableRow& table_row) {

CHECKED_STATUS QLReadOperation::PopulateAggregate(const QLTableRow& table_row,
QLResultSet *resultset) {
resultset->AllocateRow();
int column_count = request_.selected_exprs().size();
QLRSRow *rsrow = resultset->AllocateRSRow(column_count);
for (int rscol_index = 0; rscol_index < column_count; rscol_index++) {
*rsrow->rscol(rscol_index) = aggr_result_[rscol_index];
resultset->AppendColumn(rscol_index, aggr_result_[rscol_index]);
}
return Status::OK();
}
Expand Down
12 changes: 2 additions & 10 deletions src/yb/tablet/abstract_tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ CHECKED_STATUS AbstractTablet::HandleQLReadRequest(
}
RETURN_NOT_OK(schema.CreateProjectionByIdsIgnoreMissing(column_refs, &query_schema));

QLRSRowDesc rsrow_desc(ql_read_request.rsrow_desc());
QLResultSet resultset;
const QLRSRowDesc rsrow_desc(ql_read_request.rsrow_desc());
QLResultSet resultset(&rsrow_desc, &result->rows_data);
TRACE("Start Execute");
const Status s = doc_op.Execute(
QLStorage(), deadline, read_time, schema, query_schema, &resultset, &result->restart_read_ht);
Expand All @@ -63,15 +63,7 @@ CHECKED_STATUS AbstractTablet::HandleQLReadRequest(
RETURN_NOT_OK(CreatePagingStateForRead(
ql_read_request, resultset.rsrow_count(), &result->response));

// TODO(neil) The clients' request should indicate what encoding method should be used. When
// multi-shard is used to process more complicated queries, proxy-server might prefer a different
// encoding. For now, we'll call CQLSerialize() without checking encoding method.
result->response.set_status(QLResponsePB::YQL_STATUS_OK);
TRACE("Start Serialize");
RETURN_NOT_OK(resultset.CQLSerialize(ql_read_request.client(),
rsrow_desc,
&result->rows_data));
TRACE("Done Serialize");
return Status::OK();
}

Expand Down

0 comments on commit afef8f2

Please sign in to comment.