Skip to content

Commit

Permalink
#407: Refactor CQL write dependency check
Browse files Browse the repository at this point in the history
Summary: Refactor the inter-dependency check of YCQL read/write operations to be done between the executor and the read/write operations.

Test Plan: Jenkins

Reviewers: mihnea

Reviewed By: mihnea

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D5225
  • Loading branch information
robertpang committed Jul 30, 2018
1 parent e64c09b commit 74751e8
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 106 deletions.
24 changes: 24 additions & 0 deletions src/yb/client/yb_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,30 @@ uint16_t YBqlWriteOp::GetHashCode() const {
return ql_write_request_->hash_code();
}

bool YBqlWriteOp::ReadsStaticRow() const {
// A QL write op reads the static row if it reads a static column, or it writes to the static row
// and has a user-defined timestamp (which DocDB requires a read-modify-write by the timestamp).
return !ql_write_request_->column_refs().static_ids().empty() ||
(writes_static_row_ && ql_write_request_->has_user_timestamp_usec());
}

bool YBqlWriteOp::ReadsPrimaryRow() const {
// A QL write op reads the primary row reads a non-static column, it writes to the primary row
// and has a user-defined timestamp (which DocDB requires a read-modify-write by the timestamp),
// or if there is an IF clause.
return !ql_write_request_->column_refs().ids().empty() ||
(writes_primary_row_ && ql_write_request_->has_user_timestamp_usec()) ||
ql_write_request_->has_if_expr();
}

bool YBqlWriteOp::WritesStaticRow() const {
return writes_static_row_;
}

bool YBqlWriteOp::WritesPrimaryRow() const {
return writes_primary_row_;
}

// YBqlWriteOp::HashHash/Equal ---------------------------------------------------------------
size_t YBqlWriteOp::HashKeyComparator::operator() (const YBqlWriteOpPtr& op) const {
size_t hash = 0;
Expand Down
13 changes: 13 additions & 0 deletions src/yb/client/yb_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,15 @@ class YBqlWriteOp : public YBqlOp {
bool operator() (const YBqlWriteOpPtr& op1, const YBqlWriteOpPtr& op2) const override;
};

// Does this operation read/write the static or primary row?
bool ReadsStaticRow() const;
bool ReadsPrimaryRow() const;
bool WritesStaticRow() const;
bool WritesPrimaryRow() const;

void set_writes_static_row(const bool value) { writes_static_row_ = value; }
void set_writes_primary_row(const bool value) { writes_primary_row_ = value; }

protected:
virtual Type type() const override {
return QL_WRITE;
Expand All @@ -297,6 +306,10 @@ class YBqlWriteOp : public YBqlOp {
static YBqlWriteOp *NewUpdate(const std::shared_ptr<YBTable>& table);
static YBqlWriteOp *NewDelete(const std::shared_ptr<YBTable>& table);
std::unique_ptr<QLWriteRequestPB> ql_write_request_;

// Does this operation write to the static or primary row?
bool writes_static_row_ = false;
bool writes_primary_row_ = false;
};

class YBqlReadOp : public YBqlOp {
Expand Down
128 changes: 64 additions & 64 deletions src/yb/yql/cql/ql/exec/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,10 @@ Status Executor::ExecPTNode(const PTInsertStmt *tnode) {
req->set_returns_status(true);
}

// Set whether write op writes to the static/primary row.
insert_op->set_writes_static_row(tnode->ModifiesStaticRow());
insert_op->set_writes_primary_row(tnode->ModifiesPrimaryRow());

// Apply the operation.
return ApplyOperation(tnode, insert_op);
}
Expand Down Expand Up @@ -980,6 +984,10 @@ Status Executor::ExecPTNode(const PTDeleteStmt *tnode) {
req->set_returns_status(true);
}

// Set whether write op writes to the static/primary row.
delete_op->set_writes_static_row(tnode->ModifiesStaticRow());
delete_op->set_writes_primary_row(tnode->ModifiesPrimaryRow());

// Apply the operation.
return ApplyOperation(tnode, delete_op);
}
Expand Down Expand Up @@ -1038,6 +1046,10 @@ Status Executor::ExecPTNode(const PTUpdateStmt *tnode) {
req->set_returns_status(true);
}

// Set whether write op writes to the static/primary row.
update_op->set_writes_static_row(tnode->ModifiesStaticRow());
update_op->set_writes_primary_row(tnode->ModifiesPrimaryRow());

// Apply the operation.
return ApplyOperation(tnode, update_op);
}
Expand Down Expand Up @@ -1103,8 +1115,7 @@ Status Executor::ExecPTNode(const PTUseKeyspace *tnode) {

void Executor::FlushAsync() {
exec_context().inc_num_flushes();
batched_writes_by_primary_key_.clear();
batched_writes_by_hash_key_.clear();
write_batch_.Clear();
if (!ql_env_->FlushAsync(&flush_async_cb_)) {
StatementExecuted(Status::OK());
}
Expand Down Expand Up @@ -1138,8 +1149,7 @@ void Executor::FlushAsyncDone(const Status &s, const bool rescheduled_call) {
// If the operation has been deferred continue to the next operation, but if it can be
// applied now, apply it first.
if (tnode_context.IsDeferred()) {
if (!DeferOperation(static_cast<const PTDmlStmt *>(tnode),
std::static_pointer_cast<YBqlWriteOp>(main_op))) {
if (write_batch_.Add(std::static_pointer_cast<YBqlWriteOp>(main_op))) {
RETURN_STMT_NOT_OK(tnode_context.Apply(ql_env_));
}
found_deferred_stmts = true;
Expand Down Expand Up @@ -1351,7 +1361,7 @@ Status Executor::UpdateIndexes(const PTDmlStmt *tnode, QLWriteRequestPB *req) {
}
}

if (!req->update_index_ids().empty() && tnode->RequireTransaction()) {
if (!req->update_index_ids().empty() && tnode->RequiresTransaction()) {
RETURN_NOT_OK(ql_env_->PrepareChildTransaction(req->mutable_child_transaction_data()));
}
return Status::OK();
Expand Down Expand Up @@ -1383,6 +1393,7 @@ Status Executor::ApplyIndexWriteOps(const PTDmlStmt *tnode, const QLWriteRequest
VERIFY_RESULT(tnode->table()->index_map().FindIndex(index_table->id()));
shared_ptr<YBqlWriteOp> index_op(
is_upsert ? index_table->NewQLInsert() : index_table->NewQLDelete());
index_op->set_writes_primary_row(true);
QLWriteRequestPB *index_req = index_op->mutable_request();
index_req->set_request_id(req.request_id());
index_req->set_query_id(req.query_id());
Expand All @@ -1402,41 +1413,36 @@ Status Executor::ApplyIndexWriteOps(const PTDmlStmt *tnode, const QLWriteRequest
}
}
parent.AddTnode(tnode);
RETURN_NOT_OK(exec_context().Apply(index_op, DeferOperation(tnode, index_op)));
RETURN_NOT_OK(exec_context().Apply(index_op, !write_batch_.Add(index_op)));
}

return Status::OK();
}

//--------------------------------------------------------------------------------------------------

bool Executor::DeferOperation(const PTDmlStmt *tnode, const YBqlWriteOpPtr& op) {
// Defer the given write operation if 2 conditions are met:
// 1) It fails to be added to batched_writes_ because the primary / hash key collides with that
// of a prior write operation in the current write batch, and
// 2) It requires a read. We need to defer this latter colliding write operation that requires
// a read because currently we cannot read the results of a prior write operation to the same
// primary / hash key until the prior write batch has been applied in tserver. We need to defer
// the operation to the next batch.
// If the latter write operation collides with the prior one but does not require a read, it is
// okay to apply in the same batch. Our semantics allows the latter write operation to overwrite
// the prior one.
const bool has_usertimestamp = op->request().has_user_timestamp_usec();

const bool defer =
(tnode->ReadsPrimaryRow(has_usertimestamp) && batched_writes_by_primary_key_.count(op) > 0) ||
(tnode->ReadsStaticRow(has_usertimestamp) && batched_writes_by_hash_key_.count(op) > 0);

if (!defer) {
if (tnode->ModifiesPrimaryRow()) batched_writes_by_primary_key_.insert(op);
if (tnode->ModifiesStaticRow()) batched_writes_by_hash_key_.insert(op);
}

return defer;
bool Executor::WriteBatch::Add(const YBqlWriteOpPtr& op) {
// Checks if the write operation reads the primary/static row and if another operation that writes
// the primary/static row by the same primary/hash key already exists.
if ((op->ReadsPrimaryRow() && ops_by_primary_key_.count(op) > 0) ||
(op->ReadsStaticRow() && ops_by_hash_key_.count(op) > 0)) {
return false;
}

if (op->WritesPrimaryRow()) { ops_by_primary_key_.insert(op); }
if (op->WritesStaticRow()) { ops_by_hash_key_.insert(op); }
return true;
}

void Executor::WriteBatch::Clear() {
ops_by_primary_key_.clear();
ops_by_hash_key_.clear();
}

//--------------------------------------------------------------------------------------------------

Status Executor::ApplyOperation(const PTDmlStmt *tnode, const YBqlWriteOpPtr& op) {
RETURN_NOT_OK(exec_context().Apply(op, DeferOperation(tnode, op)));
RETURN_NOT_OK(exec_context().Apply(op, !write_batch_.Add(op)));

return UpdateIndexes(tnode, op->mutable_request());
}
Expand Down Expand Up @@ -1475,8 +1481,8 @@ Status Executor::ProcessStatementStatus(const ParseTree& parse_tree, const Statu
return s;
}

Status Executor::ProcessOpResponse(client::YBqlOp* op,
const TreeNode* tnode,
Status Executor::ProcessOpResponse(const PTDmlStmt* stmt,
client::YBqlOp* op,
ExecContext* exec_context) {
const QLResponsePB &resp = op->response();
CHECK(resp.has_status()) << "QLResponsePB status missing";
Expand All @@ -1485,36 +1491,31 @@ Status Executor::ProcessOpResponse(client::YBqlOp* op,
return STATUS(TryAgain, resp.error_message());
}

if (tnode->opcode() == TreeNodeOpcode::kPTInsertStmt ||
tnode->opcode() == TreeNodeOpcode::kPTUpdateStmt ||
tnode->opcode() == TreeNodeOpcode::kPTDeleteStmt) {
const auto* stmt = static_cast<const PTDmlStmt *>(tnode);
if (stmt->returns_status()) {
// If we got an error we need to manually produce a result.
auto columns = std::make_shared<std::vector<ColumnSchema>>();
columns->emplace_back("[applied]", DataType::BOOL);
columns->emplace_back("[message]", DataType::STRING);

columns->insert(columns->end(), stmt->table()->schema().columns().begin(),
stmt->table()->schema().columns().end());

QLRowBlock result_row_block(Schema(*columns, 0));
QLRow& row = result_row_block.Extend();
row.mutable_column(0)->set_bool_value(false);
row.mutable_column(1)->set_string_value(resp.error_message());
// Leave the rest of the columns null in this case.

faststring buffer;
result_row_block.Serialize(YQL_CLIENT_CQL, &buffer);
shared_ptr<RowsResult> result =
std::make_shared<RowsResult>(stmt->table()->name(), columns, buffer.ToString());

return AppendResult(result);
}
if (stmt->IsWriteOp() && stmt->returns_status()) {
// If we got an error we need to manually produce a result.
auto columns = std::make_shared<std::vector<ColumnSchema>>();
columns->emplace_back("[applied]", DataType::BOOL);
columns->emplace_back("[message]", DataType::STRING);
columns->insert(columns->end(),
stmt->table()->schema().columns().begin(),
stmt->table()->schema().columns().end());

QLRowBlock result_row_block(Schema(*columns, 0));
QLRow& row = result_row_block.Extend();
row.mutable_column(0)->set_bool_value(false);
row.mutable_column(1)->set_string_value(resp.error_message());
// Leave the rest of the columns null in this case.

faststring buffer;
result_row_block.Serialize(YQL_CLIENT_CQL, &buffer);
shared_ptr<RowsResult> result =
std::make_shared<RowsResult>(stmt->table()->name(), columns, buffer.ToString());

return AppendResult(result);
}

const ErrorCode errcode = QLStatusToErrorCode(resp.status());
return exec_context->Error(tnode, resp.error_message().c_str(), errcode);
return exec_context->Error(stmt, resp.error_message().c_str(), errcode);
}
return op->rows_data().empty() ? Status::OK() : AppendResult(std::make_shared<RowsResult>(op));
}
Expand All @@ -1531,12 +1532,12 @@ Status Executor::ProcessAsyncResults(const Status& s,
Status ss = ql_env_->GetOpError(op.get());
if (PREDICT_FALSE(!ss.ok() && !ss.IsTryAgain())) {
// YBOperation returns not-found error when the tablet is not found.
const auto
errcode = ss.IsNotFound() ? ErrorCode::TABLET_NOT_FOUND : ErrorCode::EXEC_ERROR;
const auto errcode = ss.IsNotFound() ? ErrorCode::TABLET_NOT_FOUND : ErrorCode::EXEC_ERROR;
ss = exec_context->Error(tnode, ss, errcode);
}
if (ss.ok()) {
ss = ProcessOpResponse(op.get(), tnode, exec_context);
DCHECK(tnode->IsDml()) << "Only DML should produce an op response";
ss = ProcessOpResponse(static_cast<const PTDmlStmt *>(tnode), op.get(), exec_context);
}
RETURN_NOT_OK(ProcessStatementStatus(exec_context->parse_tree(), ss));
}
Expand Down Expand Up @@ -1623,8 +1624,7 @@ void Executor::StatementExecuted(const Status& s) {

void Executor::Reset() {
exec_contexts_.clear();
batched_writes_by_primary_key_.clear();
batched_writes_by_hash_key_.clear();
write_batch_.Clear();
result_ = nullptr;
dml_batch_table_ = nullptr;
returns_status_batch_opt_ = boost::none;
Expand Down
40 changes: 27 additions & 13 deletions src/yb/yql/cql/ql/exec/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class Executor : public QLExprExecutor {
CHECKED_STATUS ProcessStatementStatus(const ParseTree& parse_tree, const Status& s);

// Process the read/write op response.
CHECKED_STATUS ProcessOpResponse(client::YBqlOp* op,
const TreeNode* tnode,
CHECKED_STATUS ProcessOpResponse(const PTDmlStmt* stmt,
client::YBqlOp* op,
ExecContext* exec_context);

// Process result of FlushAsyncDone.
Expand Down Expand Up @@ -304,7 +304,7 @@ class Executor : public QLExprExecutor {
CHECKED_STATUS FuncOpToPB(QLConditionPB *condition, const FuncOp& func_op);

//------------------------------------------------------------------------------------------------
bool DeferOperation(const PTDmlStmt *tnode, const client::YBqlWriteOpPtr& op);
bool DeferOperation(const client::YBqlWriteOpPtr& op);
CHECKED_STATUS ApplyOperation(const PTDmlStmt *tnode, const client::YBqlWriteOpPtr& op);

//------------------------------------------------------------------------------------------------
Expand All @@ -314,6 +314,28 @@ class Executor : public QLExprExecutor {
//------------------------------------------------------------------------------------------------
ExecContext& exec_context();

//------------------------------------------------------------------------------------------------
// Helper class to separate inter-dependent write operations.
class WriteBatch {
public:
// Add a write operation. Returns true if it does not depend on another operation in the batch.
// Returns false if it does and is not added. In that case, the operation needs to be deferred
// until the dependent operation has been applied.
bool Add(const client::YBqlWriteOpPtr& op);

// Clear the batch.
void Clear();

private:
// Sets of write operations separated by their primary and keys.
std::unordered_set<client::YBqlWriteOpPtr,
client::YBqlWriteOp::PrimaryKeyComparator,
client::YBqlWriteOp::PrimaryKeyComparator> ops_by_primary_key_;
std::unordered_set<client::YBqlWriteOpPtr,
client::YBqlWriteOp::HashKeyComparator,
client::YBqlWriteOp::HashKeyComparator> ops_by_hash_key_;
};

//------------------------------------------------------------------------------------------------
// Environment (YBClient) for executing statements.
QLEnv *ql_env_;
Expand All @@ -322,16 +344,8 @@ class Executor : public QLExprExecutor {
// execution.
std::list<ExecContext> exec_contexts_;

// Set of write operations that have been applied (separated by their primary keys).
std::unordered_set<client::YBqlWriteOpPtr,
client::YBqlWriteOp::PrimaryKeyComparator,
client::YBqlWriteOp::PrimaryKeyComparator> batched_writes_by_primary_key_;

// Set of write operations referencing a static column that have been applied (separated by their
// hash keys).
std::unordered_set<client::YBqlWriteOpPtr,
client::YBqlWriteOp::HashKeyComparator,
client::YBqlWriteOp::HashKeyComparator> batched_writes_by_hash_key_;
// Batch of outstanding write operations that are being applied.
WriteBatch write_batch_;

// Execution result.
ExecutedResult::SharedPtr result_;
Expand Down
7 changes: 3 additions & 4 deletions src/yb/yql/cql/ql/ptree/parse_tree.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ CHECKED_STATUS ParseTree::Analyze(SemContext *sem_context) {
case 1: {
const TreeNode::SharedPtr tnode = lnode->node_list().front();
RETURN_NOT_OK(tnode->Analyze(sem_context));
if ((tnode->opcode() == TreeNodeOpcode::kPTInsertStmt ||
tnode->opcode() == TreeNodeOpcode::kPTUpdateStmt ||
tnode->opcode() == TreeNodeOpcode::kPTDeleteStmt) &&
std::static_pointer_cast<PTDmlStmt>(tnode)->RequireTransaction()) {
// If the statement is a DML and requires a transaction, wrap it around with START TRANSACTION
// and COMMIT.
if (tnode->IsDml() && std::static_pointer_cast<PTDmlStmt>(tnode)->RequiresTransaction()) {
lnode->Prepend(PTStartTransaction::MakeShared(sem_context->PTreeMem(), tnode->loc_ptr()));
lnode->Append(PTCommit::MakeShared(sem_context->PTreeMem(), tnode->loc_ptr()));
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/cql/ql/ptree/pt_dml.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ CHECKED_STATUS PTDmlStmt::AnalyzeIndexesForWrites(SemContext *sem_context) {
return Status::OK();
}

bool PTDmlStmt::RequireTransaction() const {
bool PTDmlStmt::RequiresTransaction() const {
return IsWriteOp() && !DCHECK_NOTNULL(table_.get())->index_map().empty() &&
table_->InternalSchema().table_properties().is_transactional();
}
Expand Down
Loading

0 comments on commit 74751e8

Please sign in to comment.