Skip to content

Commit

Permalink
Postgresql Support
Browse files Browse the repository at this point in the history
Summary:
This diff implemented support for PostgreSQL.  It has three main sections.
- PostgreSQL utility functions in C++
- Compiler
- Server Support - DocDB, Tablet, and Master.

==============================================
PostgreSQL utility
==============================================
We convert PostgreSQL code from C to C++ libraries and use the code for two main purposes.
It's implemented in /yb/yql/pgsql/ybpostgres
- Process messages coming from PostgreSQL clients.
- Constructing messages to send to PostgreSQL clients.

==============================================
Compiler Implementation
==============================================
This diff compile Postgresql statements into protobuf code, which will be executed by DocDB.

A. The database objects.

- The "TABLE" is using "OID BIGINT" as hash key.  When the table doesn't have its own primary key, "CTID TIMEUUID" is used as primary key. During insertion, the value "0" is always inserted into "OID" column. For database accessing, the conditional clause "WHERE oid = 0" would be attached to all statements.

B. Implementation:
The implementation for Postgresql support is divided into several components, each of which is developed in one directory and built into one library. The following notes describe the content and purpose of each library.

(0) The yql/pgsql/processor library
- This library defines the API for the compilation and execution processes.
- This is the driver of all SQL statement processing.

(1) The yql/pgsql/ybpostgres library
- This library contains various utility functions that was developed by the original Postgresql.
- Yugabyte components will call the utility functions in this library to perform their tasks.
- The code has been converted into C++ style, but the contents is originated from Postgresql.

(2) The yql/pgsql/util library
- This library contains various utility functions that was developed by Yugabyte.

(3) The yql/pgsql/syn library
- This library contains the parser that compile SQL statements into parse tree.
- We modify Postgresql original lex and yacc files to generate our own treeenodes.

(4) The yql/pgsql/sem library
- This library runs semantic analysis.
(
5) The yql/pgsql/pbgen library
- This library traverses the parse tree to generate Protobuf code for all statement except DDL statements.
- The generated protobuf code can be cached in tablet server with a statement ID for future use.
- We might consider generate PB for DDL statement also. The executor would just send these PBs to master server for execution.

(6) The yql/pgsql/pbexec library
- For DDL statement, this library executes the treenode directly.
- For non-DDL statement or statements that can be prepared, this library executes the generated protobuf code.

(7) The yql/pgsql/ptree library
- This library defines the treenode.
- For coding convenience, the actually semantics analysis for each tree node is defined in the Analyze() method of each tree node. The semantic analyzer will call these functions when processing statements.

(8) The yql/pgsql/proto library
- This library defines the YugaByte protobuf code to be executed by tablet server.
- In the future, tablet server should cache these PB code so that the compilation steps can be skipped over.

(9) The yb/util/bf_pgsql
- This library will be moved to "pgsql" directory later.
- This library defines all supported functions and operators in Posgresql.
- Operators such as +, -, /, *, =, >, >=, <, <=, etc are all specified and defined here.
- Utility functions such as sin, cos, power, etc are specified and defined here.
- Database-dependent functions such as aggregate functions are specified here, but their execution code are defined in tablet server.

(10) The protocols in "yb/common"
- The file "pgsql_protocol.proto" contains the protocol for communication between the proxy and tablet server.

==============================================
DocDB, Tablet, and Master Implementation
==============================================
- PGSQL types are added to distinguish clients, databases, schemas, tables, messages, and any DB entities from CQL and REDIS
- The rest of the implementation is similar or the same as that for CQL and REDIS.

Test Plan:
Will be done on my next diff.

Reviewers: mihnea, robert

Reviewed By: robert

Subscribers: mikhail, eng

Differential Revision: https://phabricator.dev.yugabyte.com/D4245
  • Loading branch information
nocaway committed Apr 19, 2018
1 parent 7b12cce commit 49c9d4a
Show file tree
Hide file tree
Showing 189 changed files with 28,424 additions and 500 deletions.
4 changes: 2 additions & 2 deletions .arclint
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"exclude":"(\\.l$|Makefile$|src/yb/common/ql_type.h$|src/yb/yql/cql/ql/kwlist.h$)",
"exclude":"(\\.l$|Makefile$|src/yb/common/ql_type.h$)",
"linters": {
"text": {
"type": "text",
Expand All @@ -20,7 +20,7 @@
},
"googleccplint": {
"type": "googlecpplint",
"exclude": "(src/yb/yql/cql/ql/kwlist.h)",
"exclude": "(src/yb/yql/cql/ql/kwlist.h|src/yb/yql/pgsql/syn/pg_kwlist.h)",
"include": [
"(\\.cpp$)",
"(\\.cc$)",
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ function(add_executable name)
if (NOT "${YB_EXECUTABLE_FILTER_RE}" STREQUAL "" AND
NOT ${YB_ADDING_TEST_EXECUTABLE} AND
NOT "${name}" STREQUAL "bfql_codegen" AND
NOT "${name}" STREQUAL "bfpg_codegen" AND
NOT "${name}" STREQUAL "run-with-timeout" AND
NOT "${name}" STREQUAL "protoc-gen-insertions" AND
NOT "${name}" STREQUAL "protoc-gen-yrpc")
Expand Down Expand Up @@ -1639,6 +1640,7 @@ add_subdirectory(src/yb/fs)
add_subdirectory(src/yb/server)
add_subdirectory(src/yb/tablet)
add_subdirectory(src/yb/util/bfql)
add_subdirectory(src/yb/util/bfpg)
add_subdirectory(src/yb/rpc)
add_subdirectory(src/yb/tserver)
add_subdirectory(src/yb/consensus)
Expand Down
95 changes: 84 additions & 11 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ void AsyncRpc::Failed(const Status& status) {
resp->set_error_message(error_message);
break;
}
case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED;
case YBOperation::Type::PGSQL_WRITE: {
PgsqlResponsePB* resp = down_cast<YBPgsqlOp*>(yb_op)->mutable_response();
resp->set_status(PgsqlResponsePB::PGSQL_STATUS_RUNTIME_ERROR);
resp->set_error_message(error_message);
break;
}
default:
LOG(FATAL) << "Unsupported operation " << yb_op->type();
break;
Expand Down Expand Up @@ -269,6 +276,8 @@ WriteRpc::WriteRpc(
switch (op->yb_op->type()) {
case YBOperation::QL_READ: FALLTHROUGH_INTENDED;
case YBOperation::QL_WRITE: FALLTHROUGH_INTENDED;
case YBOperation::PGSQL_READ: FALLTHROUGH_INTENDED;
case YBOperation::PGSQL_WRITE: FALLTHROUGH_INTENDED;
case YBOperation::REDIS_READ: FALLTHROUGH_INTENDED;
case YBOperation::REDIS_WRITE: {
CHECK_OK(op->yb_op->GetPartitionKey(&partition_key));
Expand All @@ -283,23 +292,28 @@ WriteRpc::WriteRpc(
<< " partition_key: '" << Slice(partition_key).ToDebugHexString() << "'";

#endif
// Move write request PB into tserver write request PB for performance.
// Will restore in ProcessResponseFromTserver.
switch (op->yb_op->type()) {
case YBOperation::Type::REDIS_WRITE: {
CHECK_EQ(table()->table_type(), YBTableType::REDIS_TABLE_TYPE);
// Move Redis write request PB into tserver write request PB for performance. Will restore
// in ProcessResponseFromTserver.
auto* redis_op = down_cast<YBRedisWriteOp*>(op->yb_op.get());
req_.add_redis_write_batch()->Swap(redis_op->mutable_request());
break;
}
case YBOperation::Type::QL_WRITE: {
CHECK_EQ(table()->table_type(), YBTableType::YQL_TABLE_TYPE);
// Move QL write request PB into tserver write request PB for performance. Will restore
// in ProcessResponseFromTserver.
auto* ql_op = down_cast<YBqlWriteOp*>(op->yb_op.get());
req_.add_ql_write_batch()->Swap(ql_op->mutable_request());
break;
}
case YBOperation::Type::PGSQL_WRITE: {
CHECK_EQ(table()->table_type(), YBTableType::PGSQL_TABLE_TYPE);
auto* pgsql_op = down_cast<YBPgsqlWriteOp*>(op->yb_op.get());
req_.add_pgsql_write_batch()->Swap(pgsql_op->mutable_request());
break;
}
case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED;
case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED;
case YBOperation::Type::QL_READ:
LOG(FATAL) << "Not a write operation " << op->yb_op->type();
Expand Down Expand Up @@ -359,6 +373,7 @@ void WriteRpc::ProcessResponseFromTserver(const Status& status) {

size_t redis_idx = 0;
size_t ql_idx = 0;
size_t pgsql_idx = 0;
// Retrieve Redis and QL responses and make sure we received all the responses back.
for (auto& op : ops_) {
YBOperation* yb_op = op->yb_op.get();
Expand Down Expand Up @@ -394,7 +409,27 @@ void WriteRpc::ProcessResponseFromTserver(const Status& status) {
ql_idx++;
break;
}

case YBOperation::Type::PGSQL_WRITE: {
if (pgsql_idx >= resp_.pgsql_response_batch().size()) {
batcher_->AddOpCountMismatchError();
return;
}
// Restore QL write request PB and extract response.
auto* pgsql_op = down_cast<YBPgsqlWriteOp*>(yb_op);
pgsql_op->mutable_request()->Swap(req_.mutable_pgsql_write_batch(pgsql_idx));
pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_response_batch(pgsql_idx));
const auto& pgsql_response = pgsql_op->response();
if (pgsql_response.has_rows_data_sidecar()) {
Slice rows_data;
CHECK_OK(retrier().controller().GetSidecar(
pgsql_response.rows_data_sidecar(), &rows_data));
down_cast<YBPgsqlWriteOp*>(yb_op)->mutable_rows_data()->assign(
util::to_char_ptr(rows_data.data()), rows_data.size());
}
pgsql_idx++;
break;
}
case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED;
case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED;
case YBOperation::Type::QL_READ:
LOG(FATAL) << "Not a write operation " << op->yb_op->type();
Expand All @@ -403,12 +438,15 @@ void WriteRpc::ProcessResponseFromTserver(const Status& status) {
}

if (redis_idx != resp_.redis_response_batch().size() ||
ql_idx != resp_.ql_response_batch().size()) {
ql_idx != resp_.ql_response_batch().size() ||
pgsql_idx != resp_.pgsql_response_batch().size()) {
LOG(ERROR) << Substitute("Write response count mismatch: "
"$0 Redis requests sent, $1 responses received. "
"$2 QL requests sent, $3 responses received.",
"$2 Apache CQL requests sent, $3 responses received. "
"$4 PostgreSQL requests sent, $5 responses received.",
redis_idx, resp_.redis_response_batch().size(),
ql_idx, resp_.ql_response_batch().size());
ql_idx, resp_.ql_response_batch().size(),
pgsql_idx, resp_.pgsql_response_batch().size());
batcher_->AddOpCountMismatchError();
Failed(STATUS(IllegalState, "Write response count mismatch"));
}
Expand Down Expand Up @@ -443,6 +481,16 @@ ReadRpc::ReadRpc(
}
break;
}
case YBOperation::Type::PGSQL_READ: {
CHECK_EQ(table()->table_type(), YBTableType::PGSQL_TABLE_TYPE);
auto* pgsql_op = down_cast<YBPgsqlReadOp*>(op->yb_op.get());
req_.add_pgsql_batch()->Swap(pgsql_op->mutable_request());
if (pgsql_op->read_time()) {
pgsql_op->read_time().AddToPB(&req_);
}
break;
}
case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED;
case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED;
case YBOperation::Type::QL_WRITE:
LOG(FATAL) << "Not a read operation " << op->yb_op->type();
Expand Down Expand Up @@ -497,6 +545,7 @@ void ReadRpc::ProcessResponseFromTserver(const Status& status) {
// Retrieve Redis and QL responses and make sure we received all the responses back.
size_t redis_idx = 0;
size_t ql_idx = 0;
size_t pgsql_idx = 0;
for (auto& op : ops_) {
YBOperation* yb_op = op->yb_op.get();
switch (yb_op->type()) {
Expand Down Expand Up @@ -531,6 +580,27 @@ void ReadRpc::ProcessResponseFromTserver(const Status& status) {
ql_idx++;
break;
}
case YBOperation::Type::PGSQL_READ: {
if (pgsql_idx >= resp_.pgsql_batch().size()) {
batcher_->AddOpCountMismatchError();
return;
}
// Restore PGSQL read request PB and extract response.
auto* pgsql_op = down_cast<YBPgsqlReadOp*>(yb_op);
pgsql_op->mutable_request()->Swap(req_.mutable_pgsql_batch(pgsql_idx));
pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_batch(pgsql_idx));
const auto& pgsql_response = pgsql_op->response();
if (pgsql_response.has_rows_data_sidecar()) {
Slice rows_data;
CHECK_OK(retrier().controller().GetSidecar(
pgsql_response.rows_data_sidecar(), &rows_data));
down_cast<YBPgsqlReadOp*>(yb_op)->mutable_rows_data()->assign(
util::to_char_ptr(rows_data.data()), rows_data.size());
}
pgsql_idx++;
break;
}
case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED;
case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED;
case YBOperation::Type::QL_WRITE:
LOG(FATAL) << "Not a read operation " << op->yb_op->type();
Expand All @@ -539,12 +609,15 @@ void ReadRpc::ProcessResponseFromTserver(const Status& status) {
}

if (redis_idx != resp_.redis_batch().size() ||
ql_idx != resp_.ql_batch().size()) {
ql_idx != resp_.ql_batch().size() ||
pgsql_idx != resp_.pgsql_batch().size()) {
LOG(ERROR) << Substitute("Read response count mismatch: "
"$0 Redis requests sent, $1 responses received. "
"$2 QL requests sent, $3 responses received.",
"$2 QL requests sent, $3 responses received. "
"$4 QL requests sent, $5 responses received.",
redis_idx, resp_.redis_batch().size(),
ql_idx, resp_.ql_batch().size());
ql_idx, resp_.ql_batch().size(),
pgsql_idx, resp_.pgsql_batch().size());
batcher_->AddOpCountMismatchError();
Failed(STATUS(IllegalState, "Read response count mismatch"));
}
Expand Down
23 changes: 19 additions & 4 deletions src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,35 @@ Status Batcher::Add(shared_ptr<YBOperation> yb_op) {
in_flight_op->yb_op = yb_op;
in_flight_op->state = InFlightOpState::kLookingUpTablet;

if (yb_op->type() == YBOperation::Type::QL_READ) {
switch (yb_op->type()) {
case YBOperation::Type::QL_READ:
if (!in_flight_op->partition_key.empty()) {
down_cast<YBqlOp *>(yb_op.get())->SetHashCode(
PartitionSchema::DecodeMultiColumnHashValue(in_flight_op->partition_key));
}
} else if (yb_op->type() == YBOperation::Type::QL_WRITE) {
break;
case YBOperation::Type::QL_WRITE:
down_cast<YBqlOp*>(yb_op.get())->SetHashCode(
PartitionSchema::DecodeMultiColumnHashValue(in_flight_op->partition_key));
} else if (yb_op->type() == YBOperation::Type::REDIS_READ) {
break;
case YBOperation::Type::REDIS_READ:
down_cast<YBRedisReadOp*>(yb_op.get())->SetHashCode(
PartitionSchema::DecodeMultiColumnHashValue(in_flight_op->partition_key));
} else if (yb_op->type() == YBOperation::Type::REDIS_WRITE) {
break;
case YBOperation::Type::REDIS_WRITE:
down_cast<YBRedisWriteOp*>(yb_op.get())->SetHashCode(
PartitionSchema::DecodeMultiColumnHashValue(in_flight_op->partition_key));
break;
case YBOperation::Type::PGSQL_READ:
if (!in_flight_op->partition_key.empty()) {
down_cast<YBPgsqlOp *>(yb_op.get())->SetHashCode(
PartitionSchema::DecodeMultiColumnHashValue(in_flight_op->partition_key));
}
break;
case YBOperation::Type::PGSQL_WRITE:
down_cast<YBPgsqlOp*>(yb_op.get())->SetHashCode(
PartitionSchema::DecodeMultiColumnHashValue(in_flight_op->partition_key));
break;
}

AddInFlightOp(in_flight_op);
Expand Down
Loading

0 comments on commit 49c9d4a

Please sign in to comment.