diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 9da5750d8d83f9..33c109d19bcc31 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1328,15 +1328,27 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, while (it->has_next()) { auto [k, v] = it->next(); - auto rs = response->add_rowset_meta(); + auto* rs = response->add_rowset_meta(); + auto byte_size = rs->ByteSizeLong(); + TEST_SYNC_POINT_CALLBACK("get_rowset:meta_exceed_limit", &byte_size); + if (byte_size + v.size() > std::numeric_limits::max()) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format( + "rowset meta exceeded 2G, unable to serialize, key={}. byte_size={}", + hex(k), byte_size); + LOG(WARNING) << msg; + return; + } if (!rs->ParseFromArray(v.data(), v.size())) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "malformed rowset meta, unable to deserialize"; + msg = "malformed rowset meta, unable to serialize"; LOG(WARNING) << msg << " key=" << hex(k); return; } ++num_rowsets; - if (!it->has_next()) key0 = k; + if (!it->has_next()) { + key0 = k; + } } key0.push_back('\x00'); // Update to next smallest key for iteration } while (it->more()); diff --git a/cloud/test/txn_lazy_commit_test.cpp b/cloud/test/txn_lazy_commit_test.cpp index 9a7679f3dd9e23..0f284508a3f34e 100644 --- a/cloud/test/txn_lazy_commit_test.cpp +++ b/cloud/test/txn_lazy_commit_test.cpp @@ -25,7 +25,9 @@ #include #include +#include #include +#include #include #include #include @@ -1812,4 +1814,90 @@ TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase4Test) { ASSERT_TRUE(abort_timeout_txn_hit); ASSERT_EQ(txn_id, txn_info_pb.txn_id()); } -} // namespace doris::cloud \ No newline at end of file + +TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) { + auto txn_kv = get_mem_txn_kv(); + + int64_t db_id = 5252025; + int64_t table_id = 35201043384; + int64_t index_id = 256439; + int64_t partition_id = 732536259; + + auto meta_service = get_meta_service(txn_kv, true); + int64_t tablet_id = 25910248; + + { + create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, + tablet_id); + } + { + int tmp_txn_id = 0; + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_label_32ae213dasg3"); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + tmp_txn_id = res.txn_id(); + ASSERT_GT(res.txn_id(), 0); + } + { + auto tmp_rowset = create_rowset(tmp_txn_id, tablet_id, partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + { + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(tmp_txn_id); + req.set_is_2pc(false); + req.set_enable_txn_lazy_commit(true); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + } + + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("get_rowset:meta_exceed_limit", [](auto&& args) { + auto* byte_size = try_any_cast(args[0]); + *byte_size = std::numeric_limits::max(); + ++(*byte_size); + }); + + sp->enable_processing(); + { + brpc::Controller cntl; + GetRowsetRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + auto* tablet_idx = req.mutable_idx(); + tablet_idx->set_table_id(table_id); + tablet_idx->set_index_id(index_id); + tablet_idx->set_partition_id(partition_id); + tablet_idx->set_tablet_id(tablet_id); + req.set_start_version(0); + req.set_end_version(-1); + req.set_cumulative_compaction_cnt(0); + req.set_base_compaction_cnt(0); + req.set_cumulative_point(2); + + GetRowsetResponse res; + meta_service->get_rowset(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::PROTOBUF_PARSE_ERR); + } +} + +} // namespace doris::cloud