Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Revert](partial update) Revert "Fix missing rowsets during doing alignment when flushing memtable due to compaction (#28062)" #28674

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,18 +493,13 @@ inline RowsetId extract_rowset_id(std::string_view filename) {
class DeleteBitmap;
// merge on write context
struct MowContext {
MowContext(int64_t version, int64_t txnid, RowsetIdUnorderedSet& ids,
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
std::shared_ptr<DeleteBitmap> db)
: max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {}
void update_rowset_ids_with_lock(std::function<void()> callback) {
std::lock_guard<std::mutex> lock(m);
callback();
}
int64_t max_version;
int64_t txn_id;
RowsetIdUnorderedSet& rowset_ids;
const RowsetIdUnorderedSet& rowset_ids;
std::shared_ptr<DeleteBitmap> delete_bitmap;
std::mutex m; // protection for updating rowset_ids only
};

// used in mow partial update
Expand Down
27 changes: 0 additions & 27 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,33 +182,6 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
{
std::shared_lock meta_rlock(tablet->get_header_lock());
specified_rowsets = tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
DBUG_EXECUTE_IF("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets",
{ specified_rowsets.clear(); });
if (specified_rowsets.size() != _context.mow_context->rowset_ids.size()) {
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_context.mow_context->rowset_ids`
// to the latest value and re-request the correspoding rowsets.
LOG(INFO) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
"transaction_id: {}",
specified_rowsets.size(), _context.mow_context->rowset_ids.size(),
_context.tablet->tablet_id(), _context.mow_context->max_version,
_context.mow_context->txn_id);
Status st {Status::OK()};
_context.mow_context->update_rowset_ids_with_lock([&]() {
_context.mow_context->rowset_ids.clear();
st = tablet->all_rs_id(_context.mow_context->max_version,
&_context.mow_context->rowset_ids);
});
if (!st.ok()) {
return st;
}
specified_rowsets = tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
DCHECK(specified_rowsets.size() == _context.mow_context->rowset_ids.size());
}
}
OlapStopWatch watch;
RETURN_IF_ERROR(tablet->calc_delete_bitmap(rowset, segments, specified_rowsets,
Expand Down
26 changes: 0 additions & 26 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
#include "service/point_query_executor.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -413,31 +412,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
{
std::shared_lock rlock(tablet->get_header_lock());
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
{ specified_rowsets.clear(); });
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids`
// to the latest value and re-request the correspoding rowsets.
LOG(INFO) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
"transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
Status st {Status::OK()};
_mow_context->update_rowset_ids_with_lock([&]() {
_mow_context->rowset_ids.clear();
st = tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids);
});
if (!st.ok()) {
return st;
}
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size());
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data
Expand Down
26 changes: 0 additions & 26 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
#include "service/point_query_executor.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -347,31 +346,6 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
{
std::shared_lock rlock(tablet->get_header_lock());
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
{ specified_rowsets.clear(); });
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids`
// to the latest value and re-request the correspoding rowsets.
LOG(INFO) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
"transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
Status st {Status::OK()};
_mow_context->update_rowset_ids_with_lock([&]() {
_mow_context->rowset_ids.clear();
st = tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids);
});
if (!st.ok()) {
return st;
}
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size());
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ suite("test_primary_key_partial_update_publish", "p0") {

file '10000.csv'
time 10000 // limit inflight 10s
}
}
streamLoad {
table "${tableName}"

Expand Down Expand Up @@ -68,5 +68,5 @@ suite("test_primary_key_partial_update_publish", "p0") {
"""

// drop drop
sql """ DROP TABLE IF EXISTS ${tableName} """
// sql """ DROP TABLE IF EXISTS ${tableName} """
}