Skip to content

Commit 4e86f9b

Browse files
authored
[improve](move-memtable) include and check offset when append data (apache#28159)
1 parent 16e232a commit 4e86f9b

9 files changed

+94
-49
lines changed

be/src/io/fs/stream_sink_file_writer.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,17 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
4444
for (int i = 0; i < data_cnt; i++) {
4545
bytes_req += data[i].get_size();
4646
}
47-
_bytes_appended += bytes_req;
4847

4948
VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", index_id: " << _index_id
5049
<< ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id
5150
<< ", data_length: " << bytes_req;
5251

5352
std::span<const Slice> slices {data, data_cnt};
5453
for (auto& stream : _streams) {
55-
RETURN_IF_ERROR(
56-
stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, slices));
54+
RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id,
55+
_bytes_appended, slices));
5756
}
57+
_bytes_appended += bytes_req;
5858
return Status::OK();
5959
}
6060

@@ -63,8 +63,8 @@ Status StreamSinkFileWriter::finalize() {
6363
<< ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id;
6464
// TODO(zhengyu): update get_inverted_index_file_size into stat
6565
for (auto& stream : _streams) {
66-
RETURN_IF_ERROR(
67-
stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, {}, true));
66+
RETURN_IF_ERROR(stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id,
67+
_bytes_appended, {}, true));
6868
}
6969
return Status::OK();
7070
}

be/src/runtime/load_stream.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
127127
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
128128
butil::IOBuf buf = data->movable();
129129
auto flush_func = [this, new_segid, eos, buf, header]() {
130-
auto st = _load_stream_writer->append_data(new_segid, buf);
130+
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf);
131131
if (eos && st.ok()) {
132132
st = _load_stream_writer->close_segment(new_segid);
133133
}

be/src/runtime/load_stream_writer.cpp

+55-16
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,14 @@ Status LoadStreamWriter::init() {
8383
return Status::OK();
8484
}
8585

86-
Status LoadStreamWriter::append_data(uint32_t segid, butil::IOBuf buf) {
86+
Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf) {
8787
io::FileWriter* file_writer = nullptr;
8888
{
8989
std::lock_guard lock_guard(_lock);
9090
if (!_is_init) {
9191
RETURN_IF_ERROR(init());
9292
}
93-
if (segid + 1 > _segment_file_writers.size()) {
93+
if (segid >= _segment_file_writers.size()) {
9494
for (size_t i = _segment_file_writers.size(); i <= segid; i++) {
9595
Status st;
9696
io::FileWriterPtr file_writer;
@@ -107,32 +107,70 @@ Status LoadStreamWriter::append_data(uint32_t segid, butil::IOBuf buf) {
107107
file_writer = _segment_file_writers[segid].get();
108108
}
109109
VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
110+
if (file_writer == nullptr) {
111+
return Status::Corruption("append_data failed, file writer {} is destoryed", segid);
112+
}
113+
if (file_writer->bytes_appended() != offset) {
114+
return Status::Corruption(
115+
"append_data out-of-order in segment={}, expected offset={}, actual={}",
116+
file_writer->path().native(), offset, file_writer->bytes_appended());
117+
}
110118
return file_writer->append(buf.to_string());
111119
}
112120

113121
Status LoadStreamWriter::close_segment(uint32_t segid) {
114-
auto st = _segment_file_writers[segid]->close();
122+
io::FileWriter* file_writer = nullptr;
123+
{
124+
std::lock_guard lock_guard(_lock);
125+
if (!_is_init) {
126+
return Status::Corruption("close_segment failed, LoadStreamWriter is not inited");
127+
}
128+
if (segid >= _segment_file_writers.size()) {
129+
return Status::Corruption("close_segment failed, segment {} is never opened", segid);
130+
}
131+
file_writer = _segment_file_writers[segid].get();
132+
}
133+
if (file_writer == nullptr) {
134+
return Status::Corruption("close_segment failed, file writer {} is destoryed", segid);
135+
}
136+
auto st = file_writer->close();
115137
if (!st.ok()) {
116138
_is_canceled = true;
117139
return st;
118140
}
119-
if (_segment_file_writers[segid]->bytes_appended() == 0) {
120-
return Status::Corruption("segment {} is zero bytes", segid);
141+
LOG(INFO) << "segment " << segid << " path " << file_writer->path().native()
142+
<< "closed, written " << file_writer->bytes_appended() << " bytes";
143+
if (file_writer->bytes_appended() == 0) {
144+
return Status::Corruption("segment {} closed with 0 bytes", file_writer->path().native());
121145
}
122-
LOG(INFO) << "segid " << segid << "path " << _segment_file_writers[segid]->path() << " written "
123-
<< _segment_file_writers[segid]->bytes_appended() << " bytes";
124146
return Status::OK();
125147
}
126148

127149
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat,
128150
TabletSchemaSPtr flush_schema) {
129-
if (_segment_file_writers[segid]->bytes_appended() != stat.data_size) {
130-
LOG(WARNING) << _segment_file_writers[segid]->path() << " is incomplete, actual size: "
131-
<< _segment_file_writers[segid]->bytes_appended()
132-
<< ", expected size: " << stat.data_size;
133-
return Status::Corruption("segment {} is incomplete, actual size: {}, expected size: {}",
134-
_segment_file_writers[segid]->path().native(),
135-
_segment_file_writers[segid]->bytes_appended(), stat.data_size);
151+
io::FileWriter* file_writer = nullptr;
152+
{
153+
std::lock_guard lock_guard(_lock);
154+
if (!_is_init) {
155+
return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
156+
}
157+
if (segid >= _segment_file_writers.size()) {
158+
return Status::Corruption("add_segment failed, segment {} is never opened", segid);
159+
}
160+
file_writer = _segment_file_writers[segid].get();
161+
}
162+
if (file_writer == nullptr) {
163+
return Status::Corruption("add_segment failed, file writer {} is destoryed", segid);
164+
}
165+
if (!file_writer->is_closed()) {
166+
return Status::Corruption("add_segment failed, segment {} is not closed",
167+
file_writer->path().native());
168+
}
169+
if (file_writer->bytes_appended() != stat.data_size) {
170+
return Status::Corruption(
171+
"add_segment failed, segment stat {} does not match, file size={}, "
172+
"stat.data_size={}",
173+
file_writer->path().native(), file_writer->bytes_appended(), stat.data_size);
136174
}
137175
return _rowset_writer->add_segment(segid, stat, flush_schema);
138176
}
@@ -152,12 +190,13 @@ Status LoadStreamWriter::close() {
152190
<< "rowset builder is supposed be to initialized before close_wait() being called";
153191

154192
if (_is_canceled) {
155-
return Status::Error<ErrorCode::INTERNAL_ERROR>("flush segment failed");
193+
return Status::InternalError("flush segment failed");
156194
}
157195

158196
for (const auto& writer : _segment_file_writers) {
159197
if (!writer->is_closed()) {
160-
return Status::Corruption("segment {} is not closed", writer->path().native());
198+
return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed",
199+
writer->path().native());
161200
}
162201
}
163202

be/src/runtime/load_stream_writer.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class LoadStreamWriter {
6161

6262
Status init();
6363

64-
Status append_data(uint32_t segid, butil::IOBuf buf);
64+
Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf);
6565

6666
Status close_segment(uint32_t segid);
6767

be/src/vec/sink/load_stream_stub.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
185185

186186
// APPEND_DATA
187187
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
188-
int64_t segment_id, std::span<const Slice> data,
188+
int64_t segment_id, uint64_t offset, std::span<const Slice> data,
189189
bool segment_eos) {
190190
PStreamHeader header;
191191
header.set_src_id(_src_id);
@@ -195,6 +195,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
195195
header.set_tablet_id(tablet_id);
196196
header.set_segment_id(segment_id);
197197
header.set_segment_eos(segment_eos);
198+
header.set_offset(offset);
198199
header.set_opcode(doris::PStreamHeader::APPEND_DATA);
199200
return _encode_and_send(header, data);
200201
}

be/src/vec/sink/load_stream_stub.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ class LoadStreamStub {
162162
// APPEND_DATA
163163
Status
164164
append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
165-
int64_t segment_id, std::span<const Slice> data, bool segment_eos = false);
165+
int64_t segment_id, uint64_t offset, std::span<const Slice> data,
166+
bool segment_eos = false);
166167

167168
// ADD_SEGMENT
168169
Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,

be/test/io/fs/stream_sink_file_writer_test.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,20 @@ class StreamSinkFileWriterTest : public testing::Test {
5757

5858
// APPEND_DATA
5959
virtual Status append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
60-
int64_t segment_id, std::span<const Slice> data,
60+
int64_t segment_id, uint64_t offset, std::span<const Slice> data,
6161
bool segment_eos = false) override {
6262
EXPECT_EQ(PARTITION_ID, partition_id);
6363
EXPECT_EQ(INDEX_ID, index_id);
6464
EXPECT_EQ(TABLET_ID, tablet_id);
6565
EXPECT_EQ(SEGMENT_ID, segment_id);
6666
if (segment_eos) {
6767
EXPECT_EQ(0, data.size());
68+
EXPECT_EQ(DATA0.length() + DATA1.length(), offset);
6869
} else {
6970
EXPECT_EQ(2, data.size());
7071
EXPECT_EQ(DATA0, data[0].to_string());
7172
EXPECT_EQ(DATA1, data[1].to_string());
73+
EXPECT_EQ(0, offset);
7274
}
7375
g_num_request++;
7476
return Status::OK();

be/test/runtime/load_stream_test.cpp

+24-23
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,8 @@ class LoadStreamMgrTest : public testing::Test {
513513
}
514514

515515
void write_one_tablet(MockSinkClient& client, UniqueId load_id, uint32_t sender_id,
516-
int64_t index_id, int64_t tablet_id, uint32_t segid, std::string& data,
517-
bool segment_eos) {
516+
int64_t index_id, int64_t tablet_id, uint32_t segid, uint64_t offset,
517+
std::string& data, bool segment_eos) {
518518
// append data
519519
butil::IOBuf append_buf;
520520
PStreamHeader header;
@@ -527,6 +527,7 @@ class LoadStreamMgrTest : public testing::Test {
527527
header.set_segment_eos(segment_eos);
528528
header.set_src_id(sender_id);
529529
header.set_partition_id(NORMAL_PARTITION_ID);
530+
header.set_offset(offset);
530531
size_t hdr_len = header.ByteSizeLong();
531532
append_buf.append((char*)&hdr_len, sizeof(size_t));
532533
append_buf.append(header.SerializeAsString());
@@ -539,27 +540,27 @@ class LoadStreamMgrTest : public testing::Test {
539540

540541
void write_normal(MockSinkClient& client) {
541542
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
542-
NORMAL_TABLET_ID, 0, NORMAL_STRING, true);
543+
NORMAL_TABLET_ID, 0, 0, NORMAL_STRING, true);
543544
}
544545

545546
void write_abnormal_load(MockSinkClient& client) {
546547
write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
547-
NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
548+
NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
548549
}
549550

550551
void write_abnormal_index(MockSinkClient& client) {
551552
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, ABNORMAL_INDEX_ID,
552-
NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
553+
NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
553554
}
554555

555556
void write_abnormal_sender(MockSinkClient& client) {
556557
write_one_tablet(client, NORMAL_LOAD_ID, ABNORMAL_SENDER_ID, NORMAL_INDEX_ID,
557-
NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
558+
NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
558559
}
559560

560561
void write_abnormal_tablet(MockSinkClient& client) {
561562
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
562-
ABNORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
563+
ABNORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
563564
}
564565

565566
void wait_for_ack(int32_t num) {
@@ -710,7 +711,7 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
710711
EXPECT_EQ(g_response_stat.num, 2);
711712
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
712713
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
713-
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
714+
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
714715

715716
// server will close stream on CLOSE_LOAD
716717
wait_for_close();
@@ -820,7 +821,7 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_b
820821
PStreamHeader header;
821822
std::string data;
822823
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
823-
data, true);
824+
0, data, true);
824825

825826
EXPECT_EQ(g_response_stat.num, 0);
826827
// CLOSE_LOAD
@@ -861,9 +862,9 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) {
861862
PStreamHeader header;
862863
std::string data = "file1 hello world 123 !@#$%^&*()_+";
863864
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
864-
data, false);
865+
0, data, false);
865866
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
866-
data, true);
867+
data.length(), data, true);
867868

868869
EXPECT_EQ(g_response_stat.num, 0);
869870
// CLOSE_LOAD
@@ -907,7 +908,7 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without
907908
PStreamHeader header;
908909
std::string data = "file1 hello world 123 !@#$%^&*()_+";
909910
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
910-
data, false);
911+
0, data, false);
911912

912913
EXPECT_EQ(g_response_stat.num, 0);
913914
// CLOSE_LOAD
@@ -948,9 +949,9 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) {
948949
PStreamHeader header;
949950
std::string data = "file1 hello world 123 !@#$%^&*()_+";
950951
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
951-
data, false);
952+
0, data, false);
952953
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
953-
data, true);
954+
data.length(), data, true);
954955

955956
EXPECT_EQ(g_response_stat.num, 0);
956957
// CLOSE_LOAD
@@ -991,13 +992,13 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) {
991992
PStreamHeader header;
992993
std::string data1 = "file1 hello world 123 !@#$%^&*()_+1";
993994
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
994-
data1, false);
995+
0, data1, false);
995996
std::string empty;
996997
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
997-
empty, true);
998+
data1.length(), empty, true);
998999
std::string data2 = "file1 hello world 123 !@#$%^&*()_+2";
9991000
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
1000-
data2, true);
1001+
0, data2, true);
10011002

10021003
EXPECT_EQ(g_response_stat.num, 0);
10031004
// CLOSE_LOAD
@@ -1044,12 +1045,12 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) {
10441045
PStreamHeader header;
10451046
std::string data1 = "file1 hello world 123 !@#$%^&*()_+1";
10461047
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
1047-
NORMAL_TABLET_ID + 0, 0, data1, true);
1048+
NORMAL_TABLET_ID + 0, 0, 0, data1, true);
10481049
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
1049-
NORMAL_TABLET_ID + 1, 0, data1, true);
1050+
NORMAL_TABLET_ID + 1, 0, 0, data1, true);
10501051
std::string data2 = "file1 hello world 123 !@#$%^&*()_+2";
10511052
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
1052-
NORMAL_TABLET_ID + 2, 0, data2, true);
1053+
NORMAL_TABLET_ID + 2, 0, 0, data2, true);
10531054

10541055
EXPECT_EQ(g_response_stat.num, 0);
10551056
// CLOSE_LOAD
@@ -1113,7 +1114,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) {
11131114
std::string data1 =
11141115
"sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid);
11151116
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID,
1116-
NORMAL_TABLET_ID, segid, data1, true);
1117+
NORMAL_TABLET_ID, segid, 0, data1, true);
11171118
segment_data[i * 3 + segid] = data1;
11181119
LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data1;
11191120
}
@@ -1186,7 +1187,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
11861187
for (int32_t segid = 2; segid >= 0; segid--) {
11871188
int i = 0;
11881189
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID,
1189-
NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], true);
1190+
NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true);
11901191
}
11911192

11921193
EXPECT_EQ(g_response_stat.num, 0);
@@ -1205,7 +1206,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
12051206
for (int32_t segid = 2; segid >= 0; segid--) {
12061207
int i = 1;
12071208
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID,
1208-
NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], true);
1209+
NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true);
12091210
}
12101211

12111212
close_load(clients[1], 1);

gensrc/proto/internal_service.proto

+1
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,7 @@ message PStreamHeader {
791791
optional SegmentStatisticsPB segment_statistics = 9;
792792
repeated PTabletID tablets = 10;
793793
optional TabletSchemaPB flush_schema = 11;
794+
optional uint64 offset = 12;
794795
}
795796

796797
message PGetWalQueueSizeRequest{

0 commit comments

Comments
 (0)