Skip to content

Commit 485d7db

Browse files
authored
[fix](partial update) Fix missing rowsets during doing alignment when flushing memtable due to compaction (apache#28062)
1 parent a3cd36c commit 485d7db

File tree

7 files changed

+182
-4
lines changed

7 files changed

+182
-4
lines changed

be/src/olap/olap_common.h

+7-2
Original file line numberDiff line numberDiff line change
@@ -493,13 +493,18 @@ inline RowsetId extract_rowset_id(std::string_view filename) {
493493
class DeleteBitmap;
494494
// merge on write context
495495
struct MowContext {
496-
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
496+
MowContext(int64_t version, int64_t txnid, RowsetIdUnorderedSet& ids,
497497
std::shared_ptr<DeleteBitmap> db)
498498
: max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {}
499+
void update_rowset_ids_with_lock(std::function<void()> callback) {
500+
std::lock_guard<std::mutex> lock(m);
501+
callback();
502+
}
499503
int64_t max_version;
500504
int64_t txn_id;
501-
const RowsetIdUnorderedSet& rowset_ids;
505+
RowsetIdUnorderedSet& rowset_ids;
502506
std::shared_ptr<DeleteBitmap> delete_bitmap;
507+
std::mutex m; // protection for updating rowset_ids only
503508
};
504509

505510
// used in mow partial update

be/src/olap/rowset/beta_rowset_writer.cpp

+27
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,33 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
182182
{
183183
std::shared_lock meta_rlock(tablet->get_header_lock());
184184
specified_rowsets = tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
185+
DBUG_EXECUTE_IF("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets",
186+
{ specified_rowsets.clear(); });
187+
if (specified_rowsets.size() != _context.mow_context->rowset_ids.size()) {
188+
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
189+
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
190+
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_context.mow_context->rowset_ids`
191+
// to the latest value and re-request the correspoding rowsets.
192+
LOG(INFO) << fmt::format(
193+
"[Memtable Flush] some rowsets have been deleted due to "
194+
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
195+
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
196+
"transaction_id: {}",
197+
specified_rowsets.size(), _context.mow_context->rowset_ids.size(),
198+
_context.tablet->tablet_id(), _context.mow_context->max_version,
199+
_context.mow_context->txn_id);
200+
Status st {Status::OK()};
201+
_context.mow_context->update_rowset_ids_with_lock([&]() {
202+
_context.mow_context->rowset_ids.clear();
203+
st = tablet->all_rs_id(_context.mow_context->max_version,
204+
&_context.mow_context->rowset_ids);
205+
});
206+
if (!st.ok()) {
207+
return st;
208+
}
209+
specified_rowsets = tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
210+
DCHECK(specified_rowsets.size() == _context.mow_context->rowset_ids.size());
211+
}
185212
}
186213
OlapStopWatch watch;
187214
RETURN_IF_ERROR(tablet->calc_delete_bitmap(rowset, segments, specified_rowsets,

be/src/olap/rowset/segment_v2/segment_writer.cpp

+26
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include "service/point_query_executor.h"
5353
#include "util/coding.h"
5454
#include "util/crc32c.h"
55+
#include "util/debug_points.h"
5556
#include "util/faststring.h"
5657
#include "util/key_util.h"
5758
#include "vec/columns/column_nullable.h"
@@ -411,6 +412,31 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
411412
{
412413
std::shared_lock rlock(tablet->get_header_lock());
413414
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
415+
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
416+
{ specified_rowsets.clear(); });
417+
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
418+
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
419+
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
420+
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids`
421+
// to the latest value and re-request the correspoding rowsets.
422+
LOG(INFO) << fmt::format(
423+
"[Memtable Flush] some rowsets have been deleted due to "
424+
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
425+
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
426+
"transaction_id: {}",
427+
specified_rowsets.size(), _mow_context->rowset_ids.size(), tablet->tablet_id(),
428+
_mow_context->max_version, _mow_context->txn_id);
429+
Status st {Status::OK()};
430+
_mow_context->update_rowset_ids_with_lock([&]() {
431+
_mow_context->rowset_ids.clear();
432+
st = tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids);
433+
});
434+
if (!st.ok()) {
435+
return st;
436+
}
437+
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
438+
DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size());
439+
}
414440
}
415441
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
416442
// locate rows in base data

be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp

+26
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include "service/point_query_executor.h"
5050
#include "util/coding.h"
5151
#include "util/crc32c.h"
52+
#include "util/debug_points.h"
5253
#include "util/faststring.h"
5354
#include "util/key_util.h"
5455
#include "vec/columns/column_nullable.h"
@@ -344,6 +345,31 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
344345
{
345346
std::shared_lock rlock(tablet->get_header_lock());
346347
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
348+
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
349+
{ specified_rowsets.clear(); });
350+
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
351+
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
352+
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
353+
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids`
354+
// to the latest value and re-request the correspoding rowsets.
355+
LOG(INFO) << fmt::format(
356+
"[Memtable Flush] some rowsets have been deleted due to "
357+
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
358+
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
359+
"transaction_id: {}",
360+
specified_rowsets.size(), _mow_context->rowset_ids.size(), tablet->tablet_id(),
361+
_mow_context->max_version, _mow_context->txn_id);
362+
Status st {Status::OK()};
363+
_mow_context->update_rowset_ids_with_lock([&]() {
364+
_mow_context->rowset_ids.clear();
365+
st = tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids);
366+
});
367+
if (!st.ok()) {
368+
return st;
369+
}
370+
specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
371+
DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size());
372+
}
347373
}
348374
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
349375
// locate rows in base data
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !1 --
3+
1 doris 1000 123 1
4+
2 doris2 2000 223 1
5+
6+
-- !2 --
7+
1 doris 200 123 1
8+
2 doris2 400 223 1
9+
4 yixiu 400 \N 4321
10+
11+
-- !3 --
12+
1 doris 1000 123 1
13+
2 doris2 2000 223 1
14+
15+
-- !4 --
16+
1 doris333 6666 555 4
17+
2 doris666 9999 888 7
18+
3 doris222 1111 987 567
19+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.codehaus.groovy.runtime.IOGroovyMethods
19+
import org.apache.doris.regression.util.Http
20+
21+
suite("test_unique_key_mow_rowsets_deleted", "nonConcurrent"){
22+
23+
def tableName = "test_unique_key_mow_rowsets_deleted1"
24+
25+
// 1. requested rowsets have been deleted during partial update
26+
sql """ DROP TABLE IF EXISTS ${tableName} """
27+
sql """ CREATE TABLE ${tableName} (
28+
`id` int(11) NOT NULL COMMENT "用户 ID",
29+
`name` varchar(65533) NOT NULL DEFAULT "yixiu" COMMENT "用户姓名",
30+
`score` int(11) NOT NULL COMMENT "用户得分",
31+
`test` int(11) NULL COMMENT "null test",
32+
`dft` int(11) DEFAULT "4321")
33+
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
34+
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """
35+
36+
sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1, "doris", 1000, 123, 1)"""
37+
qt_1 """ select * from ${tableName} order by id; """
38+
try {
39+
GetDebugPoint().enableDebugPointForAllBEs("_append_block_with_partial_content.clear_specified_rowsets")
40+
sql "set enable_unique_key_partial_update=true;"
41+
sql "set enable_insert_strict = false;"
42+
sql "sync;"
43+
sql """insert into ${tableName}(id,score) values(2,400),(1,200),(4,400)"""
44+
qt_2 """ select * from ${tableName} order by id; """
45+
sql "set enable_unique_key_partial_update=false;"
46+
sql "set enable_insert_strict = true;"
47+
sql "sync;"
48+
} finally {
49+
GetDebugPoint().disableDebugPointForAllBEs("_append_block_with_partial_content.clear_specified_rowsets")
50+
}
51+
sql "DROP TABLE IF EXISTS ${tableName};"
52+
53+
54+
// 2. requested rowsets have been deleted during row update
55+
tableName = "test_unique_key_mow_rowsets_deleted2"
56+
sql """ DROP TABLE IF EXISTS ${tableName} """
57+
sql """ CREATE TABLE ${tableName} (
58+
`id` int(11) NOT NULL COMMENT "用户 ID",
59+
`name` varchar(65533) NOT NULL DEFAULT "yixiu" COMMENT "用户姓名",
60+
`score` int(11) NOT NULL COMMENT "用户得分",
61+
`test` int(11) NULL COMMENT "null test",
62+
`dft` int(11) DEFAULT "4321")
63+
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
64+
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """
65+
sql """insert into ${tableName} values(2, "doris2", 2000, 223, 1),(1, "doris", 1000, 123, 1)"""
66+
qt_3 """ select * from ${tableName} order by id; """
67+
try {
68+
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets")
69+
sql """insert into ${tableName} values(2, "doris666", 9999, 888, 7),(1, "doris333", 6666, 555, 4), (3, "doris222", 1111, 987, 567);"""
70+
qt_4 """ select * from ${tableName} order by id; """
71+
} finally {
72+
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter::_generate_delete_bitmap.clear_specified_rowsets")
73+
}
74+
sql "DROP TABLE IF EXISTS ${tableName};"
75+
}

regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_publish.groovy

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ suite("test_primary_key_partial_update_publish", "p0") {
3737

3838
file '10000.csv'
3939
time 10000 // limit inflight 10s
40-
}
40+
}
4141
streamLoad {
4242
table "${tableName}"
4343

@@ -68,5 +68,5 @@ suite("test_primary_key_partial_update_publish", "p0") {
6868
"""
6969

7070
// drop drop
71-
// sql """ DROP TABLE IF EXISTS ${tableName} """
71+
sql """ DROP TABLE IF EXISTS ${tableName} """
7272
}

0 commit comments

Comments
 (0)