Skip to content

Commit

Permalink
Differentiate non-query SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
cq-cdy committed Feb 13, 2024
1 parent 97264b7 commit 6a56f65
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 32 deletions.
35 changes: 28 additions & 7 deletions src/common/bustub_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,30 @@ see the execution plan of your query.
}

auto BustubInstance::ExecuteSql(const std::string &sql, ResultWriter &writer) -> bool {
bustub::Binder binder(*catalog_);
binder.ParseAndSave(sql);
for (auto *stmt : binder.statement_nodes_) {
auto statement = binder.BindStatement(stmt);
if (statement->type_ == StatementType::SELECT_STATEMENT) {
isSELECTsql = true;
break;
}
}
if(!isSELECTsql){
// todo commit to raft;

}
isSELECTsql = false;
auto txn = txn_manager_->Begin();
auto result = ExecuteSqlTxn(sql, writer, txn);
auto result = ExecuteSqlTxn(sql, writer, txn, binder);

txn_manager_->Commit(txn);
delete txn;
return result;
}

auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn) -> bool {
auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn, Binder &binder)
-> bool {
if (!sql.empty() && sql[0] == '\\') {
// Internal meta-commands, like in `psql`.
if (sql == "\\dt") {
Expand All @@ -190,7 +206,7 @@ auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer,
CmdDisplayIndices(writer);
return true;
}

if (sql == "\\help") {
CmdDisplayHelp(writer);
return true;
Expand All @@ -201,16 +217,18 @@ auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer,
bool is_successful = true;

std::shared_lock<std::shared_mutex> l(catalog_lock_);
bustub::Binder binder(*catalog_);
binder.ParseAndSave(sql);
// bustub::Binder binder(*catalog_);
//binder.ParseAndSave(sql);
l.unlock();

for (auto *stmt : binder.statement_nodes_) {
auto statement = binder.BindStatement(stmt);
if (statement->type_ == StatementType::SELECT_STATEMENT) {
isSELECTsql = true;
}
switch (statement->type_) {
case StatementType::CREATE_STATEMENT: {
const auto &create_stmt = dynamic_cast<const CreateStatement &>(*statement);

std::unique_lock<std::shared_mutex> l(catalog_lock_);
auto info = catalog_->CreateTable(txn, create_stmt.table_, Schema(create_stmt.columns_));
l.unlock();
Expand Down Expand Up @@ -250,17 +268,20 @@ auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer,
continue;
}
case StatementType::VARIABLE_SHOW_STATEMENT: {

const auto &show_stmt = dynamic_cast<const VariableShowStatement &>(*statement);
auto content = GetSessionVariable(show_stmt.variable_);
WriteOneCell(fmt::format("{}={}", show_stmt.variable_, content), writer);
continue;
}
case StatementType::VARIABLE_SET_STATEMENT: {

const auto &set_stmt = dynamic_cast<const VariableSetStatement &>(*statement);
session_variables_[set_stmt.variable_] = set_stmt.value_;
continue;
}
case StatementType::EXPLAIN_STATEMENT: {

const auto &explain_stmt = dynamic_cast<const ExplainStatement &>(*statement);
std::string output;

Expand Down Expand Up @@ -345,7 +366,7 @@ auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer,
writer.EndRow();
}
writer.EndTable();
// buffer_pool_manager_->FlushAllPages();
// buffer_pool_manager_->FlushAllPages();
}
return is_successful;
}
Expand Down
4 changes: 4 additions & 0 deletions src/concurrency/lock_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

namespace bustub {

/*
核心数据结构 就是map[table_oid_t,LockRequestQueue(...,rid,txn,lock_mode_,granted,...)]
然后按照锁的理论逻辑来大量判断
*/
auto LockManager::LockTable(Transaction *txn, LockMode lock_mode, const table_oid_t &oid) -> bool {
// 分隔离情况分别讨论 1.READ_UNCOMMITTED
if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
Expand Down
3 changes: 2 additions & 1 deletion src/include/common/bustub_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class BustubInstance {
/**
* Execute a SQL query in the BusTub instance with provided txn.
*/
auto ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn) -> bool;
auto ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn,Binder& binder) -> bool;

/**
* FOR TEST ONLY. Generate test tables in this BusTub instance.
Expand Down Expand Up @@ -272,6 +272,7 @@ class BustubInstance {
void CmdDisplayHelp(ResultWriter &writer);
void WriteOneCell(const std::string &cell, ResultWriter &writer);
std::unordered_map<std::string, std::string> session_variables_;
bool isSELECTsql = false;
};

} // namespace bustub
18 changes: 18 additions & 0 deletions src/include/concurrency/lock_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,25 @@ class LockManager {
row_lock_set->second.erase(rid);
}

/*
函数维护两个集合:active_set_ 和 safe_set_。
active_set_:包含当前活动(进行中)的事务。
safe_set_:包含已知为安全的事务(即不涉及环的事务)。
对于给定的 txn_id,首先检查它是否已经在 safe_set_ 中。如果是,则返回 false。
否则,将 txn_id 添加到 active_set_ 中。
接下来,它检查当前 txn_id 等待的事务(存储在 waits_for_ 中)。
如果其中任何一个事务已经在 active_set_ 中,说明存在环,函数返回 true。
否则,递归调用自身(Dfs)以处理每个等待的事务(next_node)。
在探索完所有依赖关系后,从 active_set_ 中移除 txn_id,并将其添加到 safe_set_ 中。
最后,返回 false(表示未找到环)。
wait_for [tx1] 事务1在等待哪些事务
如果存在环(即存在依赖循环),函数返回 true。
否则,返回 false。
*/

auto Dfs(txn_id_t txn_id) -> bool {
// 如果 在 safe_set_ 中找到了 txn_id
if (safe_set_.find(txn_id) != safe_set_.end()) {
return false;
}
Expand Down
48 changes: 24 additions & 24 deletions tools/raftDB/raftDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,30 @@ auto main(int arc, char **argv) -> int {
cRaftDB db(abs_path,dbName);
sleep(INT32_MAX);
// c
// auto bustub = std::make_unique<bustub::BustubInstance>(dbName);
// bustub->GenerateMockTable();

// if (bustub->buffer_pool_manager_ != nullptr) {
// bustub->GenerateTestTable();
// }

// linenoiseHistorySetMaxLen(1024);
// linenoiseSetMultiLine(1);
// //cRaftDB db_server(abs_path, dbName);
// co_chan<ApplyMsg> msgCh(100000);
// craft::Raft raft(&db_server, &msgCh);
// raft.launch();

// while (true) {
// auto writer = bustub::FortTableWriter();
// ApplyMsg msg;
// msgCh >> msg;
// auto sqlJson = jsonFromString(msg.command.content);
// spdlog::debug("get commit json : {}" ,sqlJson.dump());
// auto sql = sqlJson["sql"];
// bustub->ExecuteSql(sql, writer);
// //writer 是结果集
// }
auto bustub = std::make_unique<bustub::BustubInstance>(dbName);
bustub->GenerateMockTable();

if (bustub->buffer_pool_manager_ != nullptr) {
bustub->GenerateTestTable();
}

linenoiseHistorySetMaxLen(1024);
linenoiseSetMultiLine(1);
cRaftDB db_server(abs_path, dbName);
co_chan<ApplyMsg> msgCh(100000);
craft::Raft raft(&db_server, &msgCh);
raft.launch();

while (true) {
auto writer = bustub::FortTableWriter();
ApplyMsg msg;
msgCh >> msg;
auto sqlJson = jsonFromString(msg.command.content);
spdlog::debug("get commit json : {}" ,sqlJson.dump());
auto sql = sqlJson["sql"];
bustub->ExecuteSql(sql, writer);
//writer 是结果集
}
sleep(INTMAX_MAX);
}
#else
Expand Down

0 comments on commit 6a56f65

Please sign in to comment.