Skip to content

Commit

Permalink
[refactor](move-memtable) remove phmap and use shared ptr in delta wr…
Browse files Browse the repository at this point in the history
…iter v2 (apache#30949)

* [refactor](move-memtable) remove phmap and use shared ptr in delta writer v2 pool

* ENABLE_FACTORY_CREATOR DeltaWriterV2
  • Loading branch information
kaijchen authored Feb 7, 2024
1 parent 4c4050b commit 960e80c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 34 deletions.
2 changes: 2 additions & 0 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class Block;
// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriterV2 {
ENABLE_FACTORY_CREATOR(DeltaWriterV2);

public:
DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
RuntimeState* state);
Expand Down
41 changes: 19 additions & 22 deletions be/src/vec/sink/delta_writer_v2_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2P

DeltaWriterV2Map::~DeltaWriterV2Map() = default;

DeltaWriterV2* DeltaWriterV2Map::get_or_create(
std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator) {
_map.lazy_emplace(tablet_id, [&](const TabletToDeltaWriterV2Map::constructor& ctor) {
ctor(tablet_id, creator());
});
return _map.at(tablet_id).get();
std::lock_guard lock(_mutex);
if (_map.contains(tablet_id)) {
return _map.at(tablet_id);
}
std::shared_ptr<DeltaWriterV2> writer = creator();
_map[tablet_id] = writer;
return writer;
}

Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
Expand All @@ -48,22 +51,15 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
_pool->erase(_load_id);
}
LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
Status status = Status::OK();
_map.for_each([&status](auto& entry) {
if (status.ok()) {
status = entry.second->close();
}
});
if (!status.ok()) {
return status;
std::lock_guard lock(_mutex);
for (auto& [_, writer] : _map) {
RETURN_IF_ERROR(writer->close());
}
LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
_map.for_each([&status, profile](auto& entry) {
if (status.ok()) {
status = entry.second->close_wait(profile);
}
});
return status;
for (auto& [_, writer] : _map) {
RETURN_IF_ERROR(writer->close_wait(profile));
}
return Status::OK();
}

void DeltaWriterV2Map::cancel(Status status) {
Expand All @@ -72,9 +68,10 @@ void DeltaWriterV2Map::cancel(Status status) {
if (num_use == 0 && _pool != nullptr) {
_pool->erase(_load_id);
}
_map.for_each([&status](auto& entry) {
static_cast<void>(entry.second->cancel_with_status(status));
});
std::lock_guard lock(_mutex);
for (auto& [_, writer] : _map) {
static_cast<void>(writer->cancel_with_status(status));
}
}

DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
Expand Down
15 changes: 5 additions & 10 deletions be/src/vec/sink/delta_writer_v2_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <parallel_hashmap/phmap.h>
#include <stddef.h>
#include <stdint.h>

Expand Down Expand Up @@ -67,8 +66,8 @@ class DeltaWriterV2Map {
~DeltaWriterV2Map();

// get or create delta writer for the given tablet, memory is managed by DeltaWriterV2Map
DeltaWriterV2* get_or_create(int64_t tablet_id,
std::function<std::unique_ptr<DeltaWriterV2>()> creator);
std::shared_ptr<DeltaWriterV2> get_or_create(
int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator);

// close all delta writers in this DeltaWriterV2Map if there is no other users
Status close(RuntimeProfile* profile = nullptr);
Expand All @@ -79,13 +78,9 @@ class DeltaWriterV2Map {
size_t size() const { return _map.size(); }

private:
using TabletToDeltaWriterV2Map = phmap::parallel_flat_hash_map<
int64_t, std::unique_ptr<DeltaWriterV2>, std::hash<int64_t>, std::equal_to<int64_t>,
std::allocator<phmap::Pair<const int64_t, std::unique_ptr<DeltaWriterV2>>>, 4,
std::mutex>;

UniqueId _load_id;
TabletToDeltaWriterV2Map _map;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<DeltaWriterV2>> _map;
std::atomic<int> _use_cnt;
DeltaWriterV2Pool* _pool = nullptr;
};
Expand All @@ -111,4 +106,4 @@ class DeltaWriterV2Pool {
};

} // namespace vectorized
} // namespace doris
} // namespace doris
4 changes: 2 additions & 2 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ Status VTabletWriterV2::write(Block& input_block) {

Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
const Rows& rows, const Streams& streams) {
DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
WriteRequest req {
.tablet_id = tablet_id,
.txn_id = _txn_id,
Expand All @@ -446,7 +446,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
<< " not found in schema, load_id=" << print_id(_load_id);
return std::unique_ptr<DeltaWriterV2>(nullptr);
}
return std::make_unique<DeltaWriterV2>(&req, streams, _state);
return DeltaWriterV2::create_unique(&req, streams, _state);
});
if (delta_writer == nullptr) {
LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
Expand Down

0 comments on commit 960e80c

Please sign in to comment.