Skip to content

Commit

Permalink
[YSQL] #1051: Bulk insert in initdb
Browse files Browse the repository at this point in the history
Summary: During initdb, rows are inserted into the system catalog tables and indexes. Those inserts can be executed in bulk to speed up initdb.

Test Plan: Create a local cluster with YSQL enabled. On a laptop with SSD, the creation time is shortened from 25 secs to 20 secs.

Reviewers: mihnea, neha, mikhail

Reviewed By: mikhail

Subscribers: neha, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D5970
  • Loading branch information
robertpang committed Mar 22, 2019
1 parent 987abc6 commit 8715be3
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 27 deletions.
5 changes: 5 additions & 0 deletions src/postgres/src/backend/access/ybc/ybcin.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,16 @@ ybcinbuild(Relation heap, Relation index, struct IndexInfo *indexInfo)

Assert(!index->rd_index->indisprimary);

/* Buffer the inserts into the index */
YBCStartBufferingWriteOperations();

/* Do the heap scan */
buildstate.index_tuples = 0;
heap_tuples = IndexBuildHeapScan(heap, index, indexInfo, true, ybcinbuildCallback,
&buildstate, NULL);

YBCFlushBufferedWriteOperations();

/*
* Return statistics
*/
Expand Down
26 changes: 26 additions & 0 deletions src/postgres/src/backend/bootstrap/bootparse.y
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "catalog/pg_authid.h"
#include "catalog/pg_class.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "catalog/pg_tablespace.h"
#include "catalog/toasting.h"
#include "commands/defrem.h"
Expand All @@ -52,6 +54,7 @@
#include "utils/rel.h"

#include "pg_yb_utils.h"
#include "executor/ybcModifyTable.h"
#include "bootstrap/ybcbootstrap.h"

/*
Expand Down Expand Up @@ -160,6 +163,11 @@ Boot_OpenStmt:
{
do_start();
boot_openrel($2);
if (IsYugaByteEnabled())
{
/* Buffer the inserts into the table */
YBCStartBufferingWriteOperations();
}
do_end();
}
;
Expand All @@ -169,6 +177,11 @@ Boot_CloseStmt:
{
do_start();
closerel($2);
if (IsYugaByteEnabled())
{
/* End the buffering of inserts and flush them */
YBCFlushBufferedWriteOperations();
}
do_end();
}
;
Expand Down Expand Up @@ -298,6 +311,19 @@ Boot_CreateStmt:
if (IsYugaByteEnabled())
{
YBCCreateSysCatalogTable($2, $3, tupdesc, shared_relation, $13);

/*
* Start buffering for pg_proc, pg_type, pg_attribute and pg_class
* explicitly. They are not opened explicitly in the generated
* postgres.bki so we need to start buffering here.
*/
if ($3 == ProcedureRelationId ||
$3 == TypeRelationId ||
$3 == AttributeRelationId ||
$3 == RelationRelationId)
{
YBCStartBufferingWriteOperations();
}
}

do_end();
Expand Down
10 changes: 10 additions & 0 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,13 @@ void YBCUpdateSysCatalogTuple(Relation rel, HeapTuple tuple)
HandleYBStatus(YBCPgDeleteStatement(update_stmt));
update_stmt = NULL;
}

void YBCStartBufferingWriteOperations()
{
HandleYBStatus(YBCPgStartBufferingWriteOperations(ybc_pg_session));
}

void YBCFlushBufferedWriteOperations()
{
HandleYBStatus(YBCPgFlushBufferedWriteOperations(ybc_pg_session));
}
4 changes: 4 additions & 0 deletions src/postgres/src/include/executor/ybcModifyTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ extern void YBCDeleteSysCatalogTuple(Relation rel, HeapTuple tuple);

extern void YBCUpdateSysCatalogTuple(Relation rel, HeapTuple tuple);

// Buffer write operations.
extern void YBCStartBufferingWriteOperations();
extern void YBCFlushBufferedWriteOperations();

//------------------------------------------------------------------------------
// Utility methods.

Expand Down
18 changes: 9 additions & 9 deletions src/yb/yql/pggate/pg_dml_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ Status PgDmlWrite::Exec() {
// Set column references in protobuf.
SetColumnRefIds(table_desc_, write_req_->mutable_column_refs());

// Execute the statement.
RETURN_NOT_OK(doc_op_->Execute());

// Get result and handle any rows returned.
RETURN_NOT_OK(doc_op_->GetResult(&row_batch_));
if (!row_batch_.empty()) {
int64_t row_count = 0;
RETURN_NOT_OK(PgDocData::LoadCache(row_batch_, &row_count, &cursor_));
accumulated_row_count_ += row_count;
// Execute the statement. If the request has been sent, get the result and handle any rows
// returned.
if (VERIFY_RESULT(doc_op_->Execute()) == RequestSent::kTrue) {
RETURN_NOT_OK(doc_op_->GetResult(&row_batch_));
if (!row_batch_.empty()) {
int64_t row_count = 0;
RETURN_NOT_OK(PgDocData::LoadCache(row_batch_, &row_count, &cursor_));
accumulated_row_count_ += row_count;
}
}

return Status::OK();
Expand Down
31 changes: 23 additions & 8 deletions src/yb/yql/pggate/pg_doc_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Result<bool> PgDocOp::EndOfResult() const {
return !has_cached_data_ && end_of_data_;
}

Status PgDocOp::Execute() {
Result<RequestSent> PgDocOp::Execute() {
if (is_canceled_) {
return STATUS(IllegalState, "Operation canceled");
}
Expand All @@ -57,7 +57,9 @@ Status PgDocOp::Execute() {
// layer.
InitUnlocked(&lock);

return SendRequestUnlocked();
RETURN_NOT_OK(SendRequestUnlocked());

return RequestSent(waiting_for_response_);
}

void PgDocOp::InitUnlocked(std::unique_lock<std::mutex>* lock) {
Expand Down Expand Up @@ -145,10 +147,19 @@ void PgDocReadOp::InitUnlocked(std::unique_lock<std::mutex>* lock) {
}

Status PgDocReadOp::SendRequestUnlocked() {
RETURN_NOT_OK(pg_session_->PgApplyAsync(read_op_, read_time_));
CHECK(!waiting_for_response_);

SCHECK_EQ(VERIFY_RESULT(pg_session_->PgApplyAsync(read_op_, read_time_)), OpBuffered::kFalse,
IllegalState, "YSQL read operation should not be buffered");

waiting_for_response_ = true;
RETURN_NOT_OK(
pg_session_->PgFlushAsync([this](const Status& s) { PgDocReadOp::ReceiveResponse(s); }));
Status s = pg_session_->PgFlushAsync([this](const Status& s) {
PgDocReadOp::ReceiveResponse(s);
});
if (!s.ok()) {
waiting_for_response_ = false;
return s;
}
return Status::OK();
}

Expand Down Expand Up @@ -200,11 +211,15 @@ PgDocWriteOp::~PgDocWriteOp() {
Status PgDocWriteOp::SendRequestUnlocked() {
CHECK(!waiting_for_response_);

RETURN_NOT_OK(pg_session_->PgApplyAsync(write_op_, read_time_));
// If the op is buffered, we should not flush now. Just return.
if (VERIFY_RESULT(pg_session_->PgApplyAsync(write_op_, read_time_)) == OpBuffered::kTrue) {
return Status::OK();
}

waiting_for_response_ = true;
Status s = pg_session_->PgFlushAsync([this](const Status& s) {
PgDocWriteOp::ReceiveResponse(s);
});
PgDocWriteOp::ReceiveResponse(s);
});
if (!s.ok()) {
waiting_for_response_ = false;
return s;
Expand Down
12 changes: 8 additions & 4 deletions src/yb/yql/pggate/pg_doc_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
namespace yb {
namespace pggate {

YB_STRONGLY_TYPED_BOOL(RequestSent);

class PgDocOp {
public:
// Public types.
Expand All @@ -45,8 +47,10 @@ class PgDocOp {
explicit PgDocOp(PgSession::ScopedRefPtr pg_session, uint64_t* read_time);
virtual ~PgDocOp();

// Postgres Ops.
virtual CHECKED_STATUS Execute();
// Execute the op. Return true if the request has been sent and is awaiting the result.
virtual Result<RequestSent> Execute();

// Get the result of the op.
virtual CHECKED_STATUS GetResult(string *result_set);

// Access functions.
Expand Down Expand Up @@ -177,8 +181,8 @@ class PgDocCompoundOp : public PgDocOp {
explicit PgDocCompoundOp(PgSession::ScopedRefPtr pg_session);
virtual ~PgDocCompoundOp();

virtual CHECKED_STATUS Execute() {
return Status::OK();
virtual Result<RequestSent> Execute() {
return RequestSent::kTrue;
}
virtual CHECKED_STATUS GetResult(string *result_set) {
return Status::OK();
Expand Down
5 changes: 4 additions & 1 deletion src/yb/yql/pggate/pg_select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ Status PgSelect::Exec() {
}

// Execute select statement asynchronously.
return doc_op_->Execute();
SCHECK_EQ(VERIFY_RESULT(doc_op_->Execute()), RequestSent::kTrue, IllegalState,
"YSQL read operation was not sent");

return Status::OK();
}

} // namespace pggate
Expand Down
56 changes: 54 additions & 2 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ using client::YBSession;
using client::YBMetaDataCache;
using client::YBSchema;
using client::YBColumnSchema;
using client::YBOperation;
using client::YBTable;
using client::YBTableName;
using client::YBTableType;
Expand Down Expand Up @@ -382,7 +383,56 @@ void PgSession::InvalidateTableCache(const PgObjectId& table_id) {
table_cache_.erase(yb_table_id);
}

Status PgSession::PgApplyAsync(const std::shared_ptr<client::YBPgsqlOp>& op, uint64_t* read_time) {
Status PgSession::StartBufferingWriteOperations() {
if (buffer_write_ops_) {
return STATUS(IllegalState, "Buffering write operations already");
}
buffer_write_ops_ = true;
return Status::OK();
}

Status PgSession::FlushBufferedWriteOperations() {
if (!buffer_write_ops_) {
return STATUS(IllegalState, "Not buffering write operations currently");
}
Status s;
if (!buffered_write_ops_.empty()) {
// Only non-transactional ops should be buffered currently.
client::YBSessionPtr session =
VERIFY_RESULT(GetSession(false /* transactional */,
false /* read_only_op */))->shared_from_this();
for (const auto& op : buffered_write_ops_) {
DCHECK(!op->IsTransactional());
RETURN_NOT_OK(session->Apply(op));
}
Synchronizer sync;
StatusFunctor callback = sync.AsStatusFunctor();
session->FlushAsync([this, session, callback] (const Status& status) {
callback(CombineErrorsToStatus(session->GetPendingErrors(), status));
});
s = sync.Wait();
buffered_write_ops_.clear();
}
buffer_write_ops_ = false;
return s;
}

Result<OpBuffered> PgSession::PgApplyAsync(const std::shared_ptr<client::YBPgsqlOp>& op,
uint64_t* read_time) {
// If the operation is a write op and we are in buffered write mode, save the op and return false
// to indicate the op should not be flushed except in bulk by FlushBufferedWriteOperations().
//
// We allow read ops while buffering writes because it can happen when building indexes for sys
// catalog tables during initdb. Continuing read ops to scan the table can be issued while
// writes to its index are being buffered.
if (buffer_write_ops_ && op->type() == YBOperation::Type::PGSQL_WRITE) {
if (op->IsTransactional()) {
return STATUS(IllegalState, "Only non-transactional ops should be buffered");
}
buffered_write_ops_.push_back(op);
return OpBuffered::kTrue;
}

if (op->IsTransactional()) {
has_txn_ops_ = true;
} else {
Expand All @@ -396,7 +446,9 @@ Status PgSession::PgApplyAsync(const std::shared_ptr<client::YBPgsqlOp>& op, uin
}
session->SetInTxnLimit(HybridTime(*read_time));
}
return session->Apply(op);
RETURN_NOT_OK(session->Apply(op));

return OpBuffered::kFalse;
}

Status PgSession::PgFlushAsync(StatusFunctor callback) {
Expand Down
18 changes: 16 additions & 2 deletions src/yb/yql/pggate/pg_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
namespace yb {
namespace pggate {

YB_STRONGLY_TYPED_BOOL(OpBuffered);

class PgTxnManager;

// This class is not thread-safe as it is mostly used by a single-threaded PostgreSQL backend
Expand Down Expand Up @@ -115,8 +117,16 @@ class PgSession : public RefCountedThreadSafe<PgSession> {
Result<PgTableDesc::ScopedRefPtr> LoadTable(const PgObjectId& table_id);
void InvalidateTableCache(const PgObjectId& table_id);

// Apply the given operation to read and write database content.
CHECKED_STATUS PgApplyAsync(const std::shared_ptr<client::YBPgsqlOp>& op, uint64_t* read_time);
// Buffer write operations.
CHECKED_STATUS StartBufferingWriteOperations();
CHECKED_STATUS FlushBufferedWriteOperations();

// Apply the given operation to read and write database content. If the operation is a write
// op, return true if the operation is buffered and should not be flushed except in bulk
// by FlushBufferedWriteOperations(). False otherwise.
Result<OpBuffered> PgApplyAsync(const std::shared_ptr<client::YBPgsqlOp>& op,
uint64_t* read_time);

CHECKED_STATUS PgFlushAsync(StatusFunctor callback);
CHECKED_STATUS RestartTransaction();
bool HasAppliedOperations() const;
Expand Down Expand Up @@ -203,6 +213,10 @@ class PgSession : public RefCountedThreadSafe<PgSession> {

std::unordered_map<TableId, std::shared_ptr<client::YBTable>> table_cache_;

// Should write operations be buffered?
bool buffer_write_ops_ = false;
std::vector<std::shared_ptr<client::YBPgsqlOp>> buffered_write_ops_;

bool has_txn_ops_ = false;
bool has_non_txn_ops_ = false;
};
Expand Down
8 changes: 8 additions & 0 deletions src/yb/yql/pggate/pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,14 @@ Status PgApiImpl::DmlFetch(PgStatement *handle, int32_t natts, uint64_t *values,
return down_cast<PgDml*>(handle)->Fetch(natts, values, isnulls, syscols, has_data);
}

Status PgApiImpl::StartBufferingWriteOperations(PgSession *pg_session) {
return pg_session->StartBufferingWriteOperations();
}

Status PgApiImpl::FlushBufferedWriteOperations(PgSession *pg_session) {
return pg_session->FlushBufferedWriteOperations();
}

Status PgApiImpl::DmlExecWriteOp(PgStatement *handle) {
switch (handle->stmt_op()) {
case StmtOp::STMT_INSERT:
Expand Down
4 changes: 4 additions & 0 deletions src/yb/yql/pggate/pggate.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ class PgApiImpl {
// - API for "order_by_expr"
// - API for "group_by_expr"

// Buffer write operations.
CHECKED_STATUS StartBufferingWriteOperations(PgSession *pg_session);
CHECKED_STATUS FlushBufferedWriteOperations(PgSession *pg_session);

//------------------------------------------------------------------------------------------------
// Insert.
CHECKED_STATUS NewInsert(PgSession *pg_session,
Expand Down
8 changes: 7 additions & 1 deletion src/yb/yql/pggate/ybc_pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,18 @@ YBCStatus YBCPgDmlFetch(YBCPgStatement handle, int32_t natts, uint64_t *values,
return ToYBCStatus(pgapi->DmlFetch(handle, natts, values, isnulls, syscols, has_data));
}

YBCStatus YBCPgStartBufferingWriteOperations(YBCPgSession pg_session) {
return ToYBCStatus(pgapi->StartBufferingWriteOperations(pg_session));
}

YBCStatus YBCPgFlushBufferedWriteOperations(YBCPgSession pg_session) {
return ToYBCStatus(pgapi->FlushBufferedWriteOperations(pg_session));
}

YBCStatus YBCPgDmlExecWriteOp(YBCPgStatement handle) {
return ToYBCStatus(pgapi->DmlExecWriteOp(handle));
}


// INSERT Operations -------------------------------------------------------------------------------
YBCStatus YBCPgNewInsert(YBCPgSession pg_session,
const YBCPgOid database_oid,
Expand Down
Loading

0 comments on commit 8715be3

Please sign in to comment.