Skip to content

Commit

Permalink
[fix](move-memtable) fix timeout to get tablet schema
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 committed Apr 4, 2024
1 parent 1f37082 commit 575318a
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 64 deletions.
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "util/debug_util.h"
#include "util/time.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/load_stream_map_pool.h"

namespace doris {

Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class RuntimeQueryStatiticsMgr;
class TMasterInfo;
class LoadChannelMgr;
class LoadStreamMgr;
class LoadStreamStubPool;
class LoadStreamMapPool;
class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class SmallFileMgr;
Expand Down Expand Up @@ -242,7 +242,7 @@ class ExecEnv {
}

#endif
LoadStreamStubPool* load_stream_stub_pool() { return _load_stream_stub_pool.get(); }
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }

vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); }

Expand Down Expand Up @@ -360,7 +360,7 @@ class ExecEnv {
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<LoadStreamStubPool> _load_stream_stub_pool;
std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;

Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/load_stream_map_pool.h"
#include "vec/spill/spill_stream_manager.h"

#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \
Expand Down Expand Up @@ -229,7 +229,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
_load_stream_stub_pool = std::make_unique<LoadStreamStubPool>();
_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
_spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);
Expand Down Expand Up @@ -552,7 +552,7 @@ void ExecEnv::destroy() {
_stream_load_executor.reset();
_memtable_memory_limiter.reset();
_delta_writer_v2_pool.reset();
_load_stream_stub_pool.reset();
_load_stream_map_pool.reset();
SAFE_STOP(_storage_engine);
SAFE_STOP(_spill_stream_mgr);
SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
// specific language governing permissions and limitations
// under the License.

#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/load_stream_map_pool.h"

#include "util/debug_points.h"
#include "vec/sink/load_stream_stub.h"

namespace doris {
class TExpr;

LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamStubPool* pool)
LoadStreamMapPool* pool)
: _load_id(load_id),
_src_id(src_id),
_num_streams(num_streams),
_use_cnt(num_use),
_pool(pool) {
_pool(pool),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_use > 0) << "use num should be greater than 0";
}
Expand All @@ -41,10 +42,9 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
return streams;
}
streams = std::make_shared<Streams>();
auto schema_map = std::make_shared<IndexToTabletSchema>();
auto mow_map = std::make_shared<IndexToEnableMoW>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, schema_map, mow_map));
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index,
_enable_unique_mow_for_index));
}
_streams_for_node[dst_id] = streams;
return streams;
Expand Down Expand Up @@ -103,11 +103,11 @@ Status LoadStreamMap::close_load() {
});
}

LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamMapPool::LoadStreamMapPool() = default;

LoadStreamStubPool::~LoadStreamStubPool() = default;
std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId load_id, int64_t src_id,
int num_streams, int num_use) {
LoadStreamMapPool::~LoadStreamMapPool() = default;
std::shared_ptr<LoadStreamMap> LoadStreamMapPool::get_or_create(UniqueId load_id, int64_t src_id,
int num_streams, int num_use) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
if (streams != nullptr) {
Expand All @@ -118,7 +118,7 @@ std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId load_i
return streams;
}

void LoadStreamStubPool::erase(UniqueId load_id) {
void LoadStreamMapPool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(load_id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/sink/load_stream_stub.h"

namespace doris {

class LoadStreamStub;

class LoadStreamStubPool;
class LoadStreamMapPool;

using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;

class LoadStreamMap {
public:
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamStubPool* pool);
LoadStreamMapPool* pool);

std::shared_ptr<Streams> get_or_create(int64_t dst_id);

Expand Down Expand Up @@ -103,17 +104,19 @@ class LoadStreamMap {
std::atomic<int> _use_cnt;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
LoadStreamStubPool* _pool = nullptr;
LoadStreamMapPool* _pool = nullptr;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;

std::mutex _tablets_to_commit_mutex;
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
};

class LoadStreamStubPool {
class LoadStreamMapPool {
public:
LoadStreamStubPool();
LoadStreamMapPool();

~LoadStreamStubPool();
~LoadStreamMapPool();

std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t src_id, int num_streams,
int num_use);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
std::ostringstream oss;
oss << "fetching tablet schema from stream " << _stream_id
<< ", load id: " << print_id(_load_id) << ", tablet id:";
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/sink/volap_table_sink_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"

namespace doris {
class TExpr;
Expand Down
55 changes: 27 additions & 28 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"

Expand Down Expand Up @@ -98,7 +98,7 @@ Status VTabletWriterV2::_incremental_open_streams(
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
if (!_streams_for_node->contains(node)) {
if (!_load_stream_map->contains(node)) {
new_backends.insert(node);
}
_tablets_for_node[node].emplace(tablet_id, tablet);
Expand All @@ -112,7 +112,7 @@ Status VTabletWriterV2::_incremental_open_streams(
}
}
for (int64_t dst_id : new_backends) {
auto streams = _streams_for_node->get_or_create(dst_id);
auto streams = _load_stream_map->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
Expand Down Expand Up @@ -240,7 +240,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
} else {
_delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
}
_streams_for_node = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
_load_id, _backend_id, _stream_per_node, _num_local_sink);
return Status::OK();
}
Expand All @@ -261,7 +261,7 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {

Status VTabletWriterV2::_open_streams() {
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = _streams_for_node->get_or_create(dst_id);
auto streams = _load_stream_map->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
Expand Down Expand Up @@ -358,7 +358,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id,
tablet.set_index_id(index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index));
streams.emplace_back(_load_stream_map->at(node_id)->at(_stream_index));
RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
Expand Down Expand Up @@ -467,13 +467,13 @@ Status VTabletWriterV2::_cancel(Status status) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
if (_streams_for_node) {
_streams_for_node->for_each([status](int64_t dst_id, const Streams& streams) {
if (_load_stream_map) {
_load_stream_map->for_each([status](int64_t dst_id, const Streams& streams) {
for (auto& stream : streams) {
stream->cancel(status);
}
});
_streams_for_node->release();
_load_stream_map->release();
}
return Status::OK();
}
Expand Down Expand Up @@ -539,29 +539,28 @@ Status VTabletWriterV2::close(Status exec_status) {
}

_calc_tablets_to_commit();
const bool is_last_sink = _streams_for_node->release();
const bool is_last_sink = _load_stream_map->release();
LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink
<< ", load_id=" << print_id(_load_id);

// send CLOSE_LOAD and close_wait on all streams
if (is_last_sink) {
RETURN_IF_ERROR(_streams_for_node->close_load());
RETURN_IF_ERROR(_load_stream_map->close_load());
SCOPED_TIMER(_close_load_timer);
RETURN_IF_ERROR(_streams_for_node->for_each_st(
[this](int64_t dst_id, const Streams& streams) -> Status {
for (auto& stream : streams) {
int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
_timeout_watch.elapsed_time() / 1000 / 1000;
if (remain_ms <= 0) {
LOG(WARNING) << "load timed out before close waiting, load_id="
<< print_id(_load_id);
return Status::TimedOut("load timed out before close waiting");
}
RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
}
return Status::OK();
}));
RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id,
const Streams& streams) -> Status {
for (auto& stream : streams) {
int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
_timeout_watch.elapsed_time() / 1000 / 1000;
if (remain_ms <= 0) {
LOG(WARNING) << "load timed out before close waiting, load_id="
<< print_id(_load_id);
return Status::TimedOut("load timed out before close waiting");
}
RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
}
return Status::OK();
}));
}

// calculate and submit commit info
Expand All @@ -570,7 +569,7 @@ Status VTabletWriterV2::close(Status exec_status) {
std::unordered_map<int64_t, Status> failed_reason;
std::vector<TTabletCommitInfo> tablet_commit_infos;

_streams_for_node->for_each([&](int64_t dst_id, const Streams& streams) {
_load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
std::unordered_set<int64_t> known_tablets;
for (const auto& stream : streams) {
for (auto [tablet_id, reason] : stream->failed_tablets()) {
Expand Down Expand Up @@ -645,7 +644,7 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
}
LOG(WARNING) << msg;
}
_streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit);
_load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class VTabletWriterV2 final : public AsyncResultWriter {
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;

std::shared_ptr<LoadStreamMap> _streams_for_node;
std::shared_ptr<LoadStreamMap> _load_stream_map;

size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,21 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/load_stream_stub_pool.h"

#include <gtest/gtest.h>

#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h"

namespace doris {

class LoadStreamStubPoolTest : public testing::Test {
class LoadStreamMapPoolTest : public testing::Test {
public:
LoadStreamStubPoolTest() = default;
virtual ~LoadStreamStubPoolTest() = default;
LoadStreamMapPoolTest() = default;
virtual ~LoadStreamMapPoolTest() = default;
};

TEST_F(LoadStreamStubPoolTest, test) {
LoadStreamStubPool pool;
TEST_F(LoadStreamMapPoolTest, test) {
LoadStreamMapPool pool;
int64_t src_id = 100;
PUniqueId load_id;
load_id.set_lo(1);
Expand Down

0 comments on commit 575318a

Please sign in to comment.