Skip to content

Commit

Permalink
feat: 合并 MingTai 的分支,完成复杂子查询
Browse files Browse the repository at this point in the history
  • Loading branch information
Kosthi committed Oct 3, 2024
2 parents 3da8111 + c6c8cfe commit 54f7ab8
Show file tree
Hide file tree
Showing 32 changed files with 7,532 additions and 3,993 deletions.
2 changes: 1 addition & 1 deletion deps/common/lang/defer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ class DeferHelper

#define SCOPE_UNIQUE_NAME(B) _SCOPE_UNIQUE_NAME(B, __LINE__)

#define DEFER(...) common::DeferHelper SCOPE_UNIQUE_NAME(defer_helper_)([&]() { __VA_ARGS__; })
#define DEFER(...) common::DeferHelper SCOPE_UNIQUE_NAME(defer_helper_)([&]() { __VA_ARGS__; })
8 changes: 8 additions & 0 deletions src/observer/common/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ class Value final
bool is_null() const { return is_null_; }
inline bool is_str() const { return attr_type_ == AttrType::CHARS; }

static int implicit_cast_cost(AttrType from, AttrType to)
{
if (from == to) {
return 0;
}
return DataType::type_instance(from)->cast_cost(to);
}

private:
void set_null();
void set_int(int val);
Expand Down
95 changes: 43 additions & 52 deletions src/observer/net/plain_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ RC PlainCommunicator::write_debug(SessionEvent *request, bool &need_disconnect)
RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
{
RC rc = write_result_internal(event, need_disconnect);

if (!need_disconnect) {
RC rc1 = write_debug(event, need_disconnect);
if (OB_FAIL(rc1)) {
Expand All @@ -180,8 +181,7 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)

RC PlainCommunicator::write_result_internal(SessionEvent *event, bool &need_disconnect)
{
RC rc = RC::SUCCESS;

RC rc = RC::SUCCESS;
need_disconnect = true;

SqlResult *sql_result = event->sql_result();
Expand All @@ -205,46 +205,53 @@ RC PlainCommunicator::write_result_internal(SessionEvent *event, bool &need_disc
const char *alias = spec.alias();
if (nullptr != alias || alias[0] != 0) {
if (0 != i) {
const char *delim = " | ";

rc = writer_->writen(delim, strlen(delim));
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
return rc;
}
}

int len = strlen(alias);

rc = writer_->writen(alias, len);
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
title_stream << " | ";
}
title_stream << alias; // 将alias存入流中
}
}

if (cell_num > 0) {
char newline = '\n';

rc = writer_->writen(&newline, 1);
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}
title_stream << '\n'; // 将换行符存入流中
}

rc = RC::SUCCESS;
if (event->session()->get_execution_mode() == ExecutionMode::CHUNK_ITERATOR && event->session()->used_chunk_mode()) {
rc = write_chunk_result(sql_result);
} else {
rc = write_tuple_result(sql_result);
}

if (OB_FAIL(rc)) {
return rc;
sql_result->close();
sql_result->set_return_code(rc);
return write_state(event, need_disconnect);
} else {
// 将 title_stream 中的内容一次性写入到 writer_
std::string buffer = title_stream.str(); // 获取整个缓冲区的内容
title_stream.str(""); // 清空流内容
title_stream.clear(); // 重置流状态(但不会清空内容)

rc = writer_->writen(buffer.c_str(), buffer.size());
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}

buffer.clear(); // 清空字符串内容

// 将 sql_result_stream 中的内容一次性写入到 writer_
buffer = sql_result_stream.str(); // 获取缓冲区的内容
sql_result_stream.str(""); // 清空流内容
sql_result_stream.clear(); // 重置流状态

rc = writer_->writen(buffer.c_str(), buffer.size());
buffer.clear(); // 清空字符串内容
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}
}

if (cell_num == 0) {
Expand Down Expand Up @@ -273,20 +280,15 @@ RC PlainCommunicator::write_tuple_result(SqlResult *sql_result)
{
RC rc = RC::SUCCESS;
Tuple *tuple = nullptr;

while (RC::SUCCESS == (rc = sql_result->next_tuple(tuple))) {
assert(tuple != nullptr);

int cell_num = tuple->cell_num();
for (int i = 0; i < cell_num; i++) {
if (i != 0) {
const char *delim = " | ";

rc = writer_->writen(delim, strlen(delim));
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}
sql_result_stream << delim; // 将分隔符存入流中
}

Value value;
Expand All @@ -297,24 +299,13 @@ RC PlainCommunicator::write_tuple_result(SqlResult *sql_result)
return rc;
}

string cell_str = value.to_string();

rc = writer_->writen(cell_str.data(), cell_str.size());
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}
// 将cell的值存入流中
std::string cell_str = value.to_string();
sql_result_stream << cell_str;
}

char newline = '\n';

rc = writer_->writen(&newline, 1);
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}
// 将换行符存入流中
sql_result_stream << '\n';
}

if (rc == RC::RECORD_EOF) {
Expand Down Expand Up @@ -369,4 +360,4 @@ RC PlainCommunicator::write_chunk_result(SqlResult *sql_result)
rc = RC::SUCCESS;
}
return rc;
}
}
6 changes: 4 additions & 2 deletions src/observer/net/plain_communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class PlainCommunicator : public Communicator
RC write_chunk_result(SqlResult *sql_result);

protected:
vector<char> send_message_delimiter_; ///< 发送消息分隔符
vector<char> debug_message_prefix_; ///< 调试信息前缀
vector<char> send_message_delimiter_; ///< 发送消息分隔符
vector<char> debug_message_prefix_; ///< 调试信息前缀
std::ostringstream title_stream;
std::ostringstream sql_result_stream;
};
3 changes: 2 additions & 1 deletion src/observer/sql/executor/create_index_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ RC CreateIndexExecutor::execute(SQLStageEvent *sql_event)

Trx *trx = session->current_trx();
Table *table = create_index_stmt->table();
return table->create_index(trx, create_index_stmt->field_meta(), create_index_stmt->index_name().c_str(), create_index_stmt->unique());
return table->create_index(
trx, create_index_stmt->field_meta(), create_index_stmt->index_name().c_str(), create_index_stmt->unique());
}
Loading

0 comments on commit 54f7ab8

Please sign in to comment.