Skip to content

Commit d611e71

Browse files
branch-3.0: [fix](meta-service) Avoid rowset meta exceeds 2G result in protobuf fatal #44780 (#45205)
Cherry-picked from #44780 Co-authored-by: Siyang Tang <tangsiyang2001@foxmail.com>
1 parent 0964b6d commit d611e71

File tree

2 files changed

+104
-4
lines changed

2 files changed

+104
-4
lines changed

cloud/src/meta-service/meta_service.cpp

+15-3
Original file line numberDiff line numberDiff line change
@@ -1328,15 +1328,27 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,
13281328

13291329
while (it->has_next()) {
13301330
auto [k, v] = it->next();
1331-
auto rs = response->add_rowset_meta();
1331+
auto* rs = response->add_rowset_meta();
1332+
auto byte_size = rs->ByteSizeLong();
1333+
TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", &byte_size);
1334+
if (byte_size + v.size() > std::numeric_limits<int32_t>::max()) {
1335+
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
1336+
msg = fmt::format(
1337+
"rowset meta exceeded 2G, unable to serialize, key={}. byte_size={}",
1338+
hex(k), byte_size);
1339+
LOG(WARNING) << msg;
1340+
return;
1341+
}
13321342
if (!rs->ParseFromArray(v.data(), v.size())) {
13331343
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
1334-
msg = "malformed rowset meta, unable to deserialize";
1344+
msg = "malformed rowset meta, unable to serialize";
13351345
LOG(WARNING) << msg << " key=" << hex(k);
13361346
return;
13371347
}
13381348
++num_rowsets;
1339-
if (!it->has_next()) key0 = k;
1349+
if (!it->has_next()) {
1350+
key0 = k;
1351+
}
13401352
}
13411353
key0.push_back('\x00'); // Update to next smallest key for iteration
13421354
} while (it->more());

cloud/test/txn_lazy_commit_test.cpp

+89-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525

2626
#include <atomic>
2727
#include <condition_variable>
28+
#include <cstddef>
2829
#include <cstdint>
30+
#include <limits>
2931
#include <memory>
3032
#include <random>
3133
#include <string>
@@ -1812,4 +1814,90 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) {
18121814
ASSERT_TRUE(abort_timeout_txn_hit);
18131815
ASSERT_EQ(txn_id, txn_info_pb.txn_id());
18141816
}
1815-
} // namespace doris::cloud
1817+
1818+
TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
1819+
auto txn_kv = get_mem_txn_kv();
1820+
1821+
int64_t db_id = 5252025;
1822+
int64_t table_id = 35201043384;
1823+
int64_t index_id = 256439;
1824+
int64_t partition_id = 732536259;
1825+
1826+
auto meta_service = get_meta_service(txn_kv, true);
1827+
int64_t tablet_id = 25910248;
1828+
1829+
{
1830+
create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id,
1831+
tablet_id);
1832+
}
1833+
{
1834+
int tmp_txn_id = 0;
1835+
{
1836+
brpc::Controller cntl;
1837+
BeginTxnRequest req;
1838+
req.set_cloud_unique_id("test_cloud_unique_id");
1839+
TxnInfoPB txn_info_pb;
1840+
txn_info_pb.set_db_id(db_id);
1841+
txn_info_pb.set_label("test_label_32ae213dasg3");
1842+
txn_info_pb.add_table_ids(table_id);
1843+
txn_info_pb.set_timeout_ms(36000);
1844+
req.mutable_txn_info()->CopyFrom(txn_info_pb);
1845+
BeginTxnResponse res;
1846+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
1847+
&req, &res, nullptr);
1848+
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
1849+
tmp_txn_id = res.txn_id();
1850+
ASSERT_GT(res.txn_id(), 0);
1851+
}
1852+
{
1853+
auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, partition_id);
1854+
CreateRowsetResponse res;
1855+
commit_rowset(meta_service.get(), tmp_rowset, res);
1856+
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
1857+
}
1858+
{
1859+
brpc::Controller cntl;
1860+
CommitTxnRequest req;
1861+
req.set_cloud_unique_id("test_cloud_unique_id");
1862+
req.set_db_id(db_id);
1863+
req.set_txn_id(tmp_txn_id);
1864+
req.set_is_2pc(false);
1865+
req.set_enable_txn_lazy_commit(true);
1866+
CommitTxnResponse res;
1867+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
1868+
&req, &res, nullptr);
1869+
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
1870+
}
1871+
}
1872+
1873+
auto* sp = SyncPoint::get_instance();
1874+
sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) {
1875+
auto* byte_size = try_any_cast<size_t*>(args[0]);
1876+
*byte_size = std::numeric_limits<int32_t>::max();
1877+
++(*byte_size);
1878+
});
1879+
1880+
sp->enable_processing();
1881+
{
1882+
brpc::Controller cntl;
1883+
GetRowsetRequest req;
1884+
req.set_cloud_unique_id("test_cloud_unique_id");
1885+
auto* tablet_idx = req.mutable_idx();
1886+
tablet_idx->set_table_id(table_id);
1887+
tablet_idx->set_index_id(index_id);
1888+
tablet_idx->set_partition_id(partition_id);
1889+
tablet_idx->set_tablet_id(tablet_id);
1890+
req.set_start_version(0);
1891+
req.set_end_version(-1);
1892+
req.set_cumulative_compaction_cnt(0);
1893+
req.set_base_compaction_cnt(0);
1894+
req.set_cumulative_point(2);
1895+
1896+
GetRowsetResponse res;
1897+
meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
1898+
&res, nullptr);
1899+
ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR);
1900+
}
1901+
}
1902+
1903+
} // namespace doris::cloud

0 commit comments

Comments
 (0)