diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 5cac916f1b..c1035c8777 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -1538,8 +1538,127 @@ void bulk_load_service::do_continue_app_bulk_load( const std::unordered_map &pinfo_map, const std::unordered_set &different_status_pidx_set) { - // TODO(heyuchen): TBD - // continue bulk load + const int32_t app_id = ainfo.app_id; + const int32_t partition_count = ainfo.partition_count; + const auto app_status = ainfo.status; + const int32_t different_count = different_status_pidx_set.size(); + const int32_t same_count = pinfo_map.size() - different_count; + const int32_t invalid_count = partition_count - pinfo_map.size(); + + ddebug_f( + "app({}) continue bulk load, app_id = {}, partition_count = {}, status = {}, there are {} " + "partitions have bulk_load_info, {} partitions have same status with app, {} " + "partitions different", + ainfo.app_name, + app_id, + partition_count, + dsn::enum_to_string(app_status), + pinfo_map.size(), + same_count, + different_count); + + // _apps_in_progress_count is used for updating app bulk load, when _apps_in_progress_count = 0 + // means app bulk load status can transfer to next stage, for example, when app status is + // downloaded, and _apps_in_progress_count = 0, app status can turn to ingesting + // see more in function `update_partition_status_on_remote_storage_reply` + int32_t in_progress_partition_count = partition_count; + if (app_status == bulk_load_status::BLS_DOWNLOADING) { + if (invalid_count > 0) { + // create missing partition, so the in_progress_count should be invalid_count + in_progress_partition_count = invalid_count; + } else if (different_count > 0) { + // it is hard to distinguish that bulk load is normal downloading or rollback to + // downloading before meta server crash, when app status is downloading, we consider + // bulk load as rolling back to downloading for convenience, for partitions whose status + // is not downloading, update them to downloading, so the in_progress_count should be + // different_count + in_progress_partition_count = different_count; + } + } else if (app_status == bulk_load_status::BLS_DOWNLOADED || + app_status == bulk_load_status::BLS_INGESTING || + app_status == bulk_load_status::BLS_SUCCEED) { + // for app status is downloaded, when all partitions turn to ingesting, app partition will + // turn to ingesting, so the in_progress_count should be same_count, ingesting and succeed + // are same + in_progress_partition_count = same_count; + } // for other cases, in_progress_count should be partition_count + { + zauto_write_lock l(_lock); + _apps_in_progress_count[app_id] = in_progress_partition_count; + } + + // if app is paused, no need to send bulk_load_request, just return + if (app_status == bulk_load_status::BLS_PAUSED) { + return; + } + + // create all missing partitions then send request to all partitions + if (app_status == bulk_load_status::BLS_DOWNLOADING && invalid_count > 0) { + for (auto i = 0; i < partition_count; ++i) { + if (pinfo_map.find(i) == pinfo_map.end()) { + create_missing_partition_dir(ainfo.app_name, gpid(app_id, i), partition_count); + } + } + return; + } + + // update all partition status to app_status + if ((app_status == bulk_load_status::BLS_FAILED || + app_status == bulk_load_status::BLS_CANCELED || + app_status == bulk_load_status::BLS_PAUSING || + app_status == bulk_load_status::BLS_DOWNLOADING) && + different_count > 0) { + for (auto pidx : different_status_pidx_set) { + update_partition_status_on_remote_storage( + ainfo.app_name, gpid(app_id, pidx), app_status); + } + } + + // send bulk_load_request to all partitions + for (auto i = 0; i < partition_count; ++i) { + gpid pid = gpid(app_id, i); + partition_bulk_load(ainfo.app_name, pid); + if (app_status == bulk_load_status::BLS_INGESTING) { + tasking::enqueue( + LPC_BULK_LOAD_INGESTION, + _meta_svc->tracker(), + std::bind(&bulk_load_service::partition_ingestion, this, ainfo.app_name, pid)); + } + } +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::create_missing_partition_dir(const std::string &app_name, + const gpid &pid, + int32_t partition_count) +{ + partition_bulk_load_info pinfo; + pinfo.status = bulk_load_status::BLS_DOWNLOADING; + blob value = dsn::json::json_forwarder::encode(pinfo); + + _meta_svc->get_meta_storage()->create_node( + get_partition_bulk_load_path(pid), + std::move(value), + [app_name, pid, partition_count, pinfo, this]() { + const int32_t app_id = pid.get_app_id(); + bool send_request = false; + ddebug_f("app({}) create partition({}) bulk_load_info", app_name, pid); + { + zauto_write_lock l(_lock); + _partition_bulk_load_info[pid] = pinfo; + + if (--_apps_in_progress_count[app_id] == 0) { + _apps_in_progress_count[app_id] = partition_count; + send_request = true; + } + } + if (send_request) { + ddebug_f("app({}) start to bulk load", app_name); + for (auto i = 0; i < partition_count; ++i) { + partition_bulk_load(app_name, gpid(app_id, i)); + } + } + }); } } // namespace replication diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index 17316b1836..68479b2295 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -246,6 +246,13 @@ class bulk_load_service const std::unordered_map &pinfo_map, const std::unordered_set &different_status_pidx_set); + // called by `do_continue_app_bulk_load` + // only used when app status is downloading and some partition bulk load info not existed on + // remote storage + void create_missing_partition_dir(const std::string &app_name, + const gpid &pid, + int32_t partition_count); + /// /// helper functions /// diff --git a/src/meta/test/meta_bulk_load_service_test.cpp b/src/meta/test/meta_bulk_load_service_test.cpp index 75a0e3a787..34c2c83e98 100644 --- a/src/meta/test/meta_bulk_load_service_test.cpp +++ b/src/meta/test/meta_bulk_load_service_test.cpp @@ -892,107 +892,289 @@ TEST_F(bulk_load_failover_test, app_info_inconsistency) ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); } -// app:download, partition[0]=downloaded, partition[1~3] not existed -TEST_F(bulk_load_failover_test, lack_of_partition_with_downloaded) +TEST_F(bulk_load_failover_test, app_downloading_test) { - _pstatus_map[0] = bulk_load_status::BLS_DOWNLOADED; - try_to_continue_bulk_load(bulk_load_status::BLS_DOWNLOADED); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); -} - -// app:ingesting, all partition not exist -TEST_F(bulk_load_failover_test, lack_of_partition_with_ingestion) -{ - try_to_continue_bulk_load(bulk_load_status::BLS_INGESTING); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); -} - -// app:succeed, partition[0,1]=succeed, partition[2~3] not existed -TEST_F(bulk_load_failover_test, lack_of_partition_with_succeed) -{ - mock_pstatus_map(bulk_load_status::BLS_SUCCEED, 1); - try_to_continue_bulk_load(bulk_load_status::BLS_SUCCEED); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); -} - -// app:failed, partition[0~2]=failed, partition[3] not existed -TEST_F(bulk_load_failover_test, lack_of_partition_with_failed) -{ - mock_pstatus_map(bulk_load_status::BLS_FAILED, 2); - try_to_continue_bulk_load(bulk_load_status::BLS_FAILED); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); -} - -// app:pausing, partition[0]=pausing, partition[1~3] not existed -TEST_F(bulk_load_failover_test, lack_of_partition_with_pausing) -{ - _pstatus_map[0] = bulk_load_status::BLS_PAUSING; - try_to_continue_bulk_load(bulk_load_status::BLS_PAUSING); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); + // Test cases: + // - partition[0,1]=downloading, partition[2,3] not existed + // - partition[0,1]=downloading, partition[2]=downloaded, partition[3] not exist + // - partition[0~3]=downloading + // - partition[0~3]=downloaded + // - partition[0]=downloaded, partition[1~3]=downloading + // - partition[0-3]=succeed + struct app_downloading_test + { + int32_t start_index; + int32_t end_index; + bulk_load_status::type pstatus; + int32_t downloaded_pidx; + bool expected_is_bulk_loading; + int32_t expected_in_process_count; + } tests[] = {{0, 1, bulk_load_status::BLS_DOWNLOADING, -1, true, SYNC_PARTITION_COUNT}, + {0, 1, bulk_load_status::BLS_DOWNLOADING, 2, false, 0}, + {0, 3, bulk_load_status::BLS_DOWNLOADING, -1, true, SYNC_PARTITION_COUNT}, + {0, 3, bulk_load_status::BLS_DOWNLOADED, -1, true, SYNC_PARTITION_COUNT}, + {1, 3, bulk_load_status::BLS_DOWNLOADING, 0, true, SYNC_PARTITION_COUNT}, + {0, 3, bulk_load_status::BLS_SUCCEED, -1, true, SYNC_PARTITION_COUNT}}; + + for (const auto &test : tests) { + SetUp(); + mock_pstatus_map(test.pstatus, test.end_index, test.start_index); + if (test.downloaded_pidx > 0) { + _pstatus_map[test.downloaded_pidx] = bulk_load_status::BLS_DOWNLOADED; + } + try_to_continue_bulk_load(bulk_load_status::BLS_DOWNLOADING); + ASSERT_EQ(app_is_bulk_loading(SYNC_APP_NAME), test.expected_is_bulk_loading); + if (test.expected_is_bulk_loading) { + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_DOWNLOADING); + ASSERT_EQ(get_app_in_process_count(SYNC_APP_ID), test.expected_in_process_count); + } + TearDown(); + } } -// app:paused, partition[0,1]=paused, partition[2,3] not existed -TEST_F(bulk_load_failover_test, lack_of_partition_with_paused) +TEST_F(bulk_load_failover_test, app_downloaded_test) { - mock_pstatus_map(bulk_load_status::BLS_PAUSED, 1); - try_to_continue_bulk_load(bulk_load_status::BLS_PAUSED); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); + // Test cases: + // - partition[0]=downloaded, partition[1~3] not existed + // - partition[0]=ingesting, partition[1~3]=succeed + // - partition[0~3]=downloaded + // - partition[0~3]=ingesting + // - partition[0~2]=downloaded, partition[3]=ingesting + struct app_downloaded_test + { + int32_t start_index; + int32_t end_index; + bulk_load_status::type pstatus; + int32_t ingesting_pidx; + bool expected_is_bulk_loading; + int32_t expected_in_process_count; + } tests[] = {{0, 0, bulk_load_status::BLS_DOWNLOADED, -1, false, 0}, + {1, 3, bulk_load_status::BLS_SUCCEED, 0, false, 0}, + {0, 3, bulk_load_status::BLS_DOWNLOADED, -1, true, SYNC_PARTITION_COUNT}, + {0, 3, bulk_load_status::BLS_INGESTING, -1, true, 0}, + {0, 2, bulk_load_status::BLS_DOWNLOADED, 3, true, 3}}; + + for (const auto &test : tests) { + SetUp(); + mock_pstatus_map(test.pstatus, test.end_index, test.start_index); + if (test.ingesting_pidx > 0) { + _pstatus_map[test.ingesting_pidx] = bulk_load_status::BLS_INGESTING; + } + try_to_continue_bulk_load(bulk_load_status::BLS_DOWNLOADED); + ASSERT_EQ(app_is_bulk_loading(SYNC_APP_NAME), test.expected_is_bulk_loading); + if (test.expected_is_bulk_loading) { + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_DOWNLOADED); + ASSERT_EQ(get_app_in_process_count(SYNC_APP_ID), test.expected_in_process_count); + } + TearDown(); + } } -// app:cancel, partition[0~2]=pausing, partition[3] not existed -TEST_F(bulk_load_failover_test, lack_of_partition_with_cancel) +TEST_F(bulk_load_failover_test, app_ingesting_test) { - mock_pstatus_map(bulk_load_status::BLS_CANCELED, 2); - try_to_continue_bulk_load(bulk_load_status::BLS_CANCELED); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); + // Test cases: + // - all partition not exist + // - partition[0~2]=ingesting, partition[3]=downloading + // - partition[0~3]=ingesting + // - partition[0~3]=succeed + // - partition[0~2]=succeed, partition[3]=ingesting + struct app_ingesting_test + { + int32_t end_index; + bulk_load_status::type pstatus; + bulk_load_status::type p3_status; + bool expected_is_bulk_loading; + int32_t expected_in_process_count; + } tests[] = {{-1, bulk_load_status::BLS_INVALID, bulk_load_status::BLS_INVALID, false, 0}, + {2, bulk_load_status::BLS_INGESTING, bulk_load_status::BLS_DOWNLOADING, false, 0}, + {3, + bulk_load_status::BLS_INGESTING, + bulk_load_status::BLS_INVALID, + true, + SYNC_PARTITION_COUNT}, + {3, bulk_load_status::BLS_SUCCEED, bulk_load_status::BLS_INVALID, true, 0}, + {2, bulk_load_status::BLS_SUCCEED, bulk_load_status::BLS_INGESTING, true, 1}}; + + for (const auto &test : tests) { + SetUp(); + mock_pstatus_map(test.pstatus, test.end_index, 0); + if (test.p3_status != bulk_load_status::BLS_INVALID) { + _pstatus_map[3] = test.p3_status; + } + try_to_continue_bulk_load(bulk_load_status::BLS_INGESTING); + ASSERT_EQ(app_is_bulk_loading(SYNC_APP_NAME), test.expected_is_bulk_loading); + if (test.expected_is_bulk_loading) { + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_INGESTING); + ASSERT_EQ(get_app_in_process_count(SYNC_APP_ID), test.expected_in_process_count); + } + TearDown(); + } } -// app:downloading, partition[0,1]=downloading, partition[2]=downloaded, partition[3] not exist -TEST_F(bulk_load_failover_test, downloading_with_partition_wrong_status) +TEST_F(bulk_load_failover_test, app_succeed_test) { - mock_pstatus_map(bulk_load_status::BLS_DOWNLOADING, 1); - _pstatus_map[2] = bulk_load_status::BLS_DOWNLOADED; - try_to_continue_bulk_load(bulk_load_status::BLS_DOWNLOADING); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); + // Test cases: + // - partition[0~2]=succeed, partition[3] not exist + // - partition[0~2]=succeed, partition[3]=failed + // - partition[0~3]=succeed + struct app_succeed_test + { + bulk_load_status::type p3_status; + bool expected_is_bulk_loading; + } tests[] = {{bulk_load_status::BLS_INVALID, false}, + {bulk_load_status::BLS_FAILED, false}, + {bulk_load_status::BLS_SUCCEED, true}}; + + for (const auto &test : tests) { + SetUp(); + mock_pstatus_map(bulk_load_status::BLS_SUCCEED, 2, 0); + if (test.p3_status != bulk_load_status::BLS_INVALID) { + _pstatus_map[3] = test.p3_status; + } + try_to_continue_bulk_load(bulk_load_status::BLS_SUCCEED); + ASSERT_EQ(app_is_bulk_loading(SYNC_APP_NAME), test.expected_is_bulk_loading); + if (test.expected_is_bulk_loading) { + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_SUCCEED); + ASSERT_EQ(get_app_in_process_count(SYNC_APP_ID), SYNC_PARTITION_COUNT); + } + TearDown(); + } } -// app:downloaded, partition[0]=succeed, partition[1~3]=ingesting -TEST_F(bulk_load_failover_test, downloaded_with_partition_wrong_status) +TEST_F(bulk_load_failover_test, app_pausing_test) { - _pstatus_map[0] = bulk_load_status::BLS_SUCCEED; - mock_pstatus_map(bulk_load_status::BLS_INGESTING, SYNC_PARTITION_COUNT - 1, 1); - try_to_continue_bulk_load(bulk_load_status::BLS_DOWNLOADED); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); + // Test cases: + // - partition[0]=pausing, partition[1~3] not existed + // - partition[0]=downloading, partition[1]=downloaded, partition[2]=pausing, + // partition[3]=paused + // - partition[0~3]=pasuing + // - partition[0]=pausing, partition[1~3]=paused + struct app_pausing_test + { + bool mixed_status; + int32_t start_index; + bulk_load_status::type pstatus; + bool expected_is_bulk_loading; + } tests[] = {{false, -1, bulk_load_status::type::BLS_PAUSING, false}, + {true, -1, bulk_load_status::type::BLS_PAUSING, true}, + {false, 1, bulk_load_status::type::BLS_PAUSING, true}, + {false, 1, bulk_load_status::type::BLS_PAUSED, true}}; + for (const auto &test : tests) { + SetUp(); + if (test.mixed_status) { + _pstatus_map[0] = bulk_load_status::BLS_DOWNLOADING; + _pstatus_map[1] = bulk_load_status::BLS_DOWNLOADED; + _pstatus_map[2] = bulk_load_status::BLS_PAUSING; + _pstatus_map[3] = bulk_load_status::BLS_PAUSED; + } else { + _pstatus_map[0] = bulk_load_status::BLS_PAUSING; + if (test.start_index > 0) { + mock_pstatus_map(test.pstatus, 3, test.start_index); + } + } + try_to_continue_bulk_load(bulk_load_status::BLS_PAUSING); + ASSERT_EQ(app_is_bulk_loading(SYNC_APP_NAME), test.expected_is_bulk_loading); + if (test.expected_is_bulk_loading) { + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_PAUSING); + ASSERT_EQ(get_app_in_process_count(SYNC_APP_ID), SYNC_PARTITION_COUNT); + } + TearDown(); + } } -// app:ingesting, partition[0]=downloading, partition[1~3]=ingesting -TEST_F(bulk_load_failover_test, ingesting_with_partition_wrong_status) +TEST_F(bulk_load_failover_test, app_paused_test) { - _pstatus_map[0] = bulk_load_status::BLS_DOWNLOADING; - mock_pstatus_map(bulk_load_status::BLS_INGESTING, SYNC_PARTITION_COUNT - 1, 1); - try_to_continue_bulk_load(bulk_load_status::BLS_INGESTING); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); + // Test cases: + // - partition[0~2]=paused, partition[3] not existed + // - partition[0~2]=paused, partition[3]=pausing + // - partition[0~3]=paused + struct app_paused_test + { + bulk_load_status::type p3_status; + bool expected_is_bulk_loading; + int32_t expected_in_process_count; + } tests[] = {{bulk_load_status::BLS_INVALID, false}, + {bulk_load_status::BLS_PAUSING, false}, + {bulk_load_status::BLS_PAUSED, true}}; + + for (const auto &test : tests) { + SetUp(); + mock_pstatus_map(bulk_load_status::BLS_PAUSED, 2, 0); + if (test.p3_status != bulk_load_status::BLS_INVALID) { + _pstatus_map[3] = test.p3_status; + } + try_to_continue_bulk_load(bulk_load_status::BLS_PAUSED); + ASSERT_EQ(app_is_bulk_loading(SYNC_APP_NAME), test.expected_is_bulk_loading); + if (test.expected_is_bulk_loading) { + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_PAUSED); + ASSERT_EQ(get_app_in_process_count(SYNC_APP_ID), SYNC_PARTITION_COUNT); + } + TearDown(); + } } -// app:succeed, partition[0~2]=succeed, partition[3]=failed -TEST_F(bulk_load_failover_test, succeed_with_partition_wrong_status) +TEST_F(bulk_load_failover_test, app_failed_test) { - mock_pstatus_map(bulk_load_status::BLS_SUCCEED, 2); - _pstatus_map[3] = bulk_load_status::BLS_FAILED; - try_to_continue_bulk_load(bulk_load_status::BLS_SUCCEED); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); + // Test cases: + // - partition[0~2]=failed, partition[3] not existed + // - partition[0~3]=failed + // - partition[0,1]=downloading, partition[2]=downloaded, partition[3]=failed + struct app_failed_test + { + bool mixed_status; + int32_t end_index; + bool expected_is_bulk_loading; + } tests[] = {{false, 2, false}, {false, 3, true}, {true, -1, true}}; + for (const auto &test : tests) { + SetUp(); + if (test.mixed_status) { + _pstatus_map[0] = bulk_load_status::BLS_DOWNLOADING; + _pstatus_map[1] = bulk_load_status::BLS_DOWNLOADING; + _pstatus_map[2] = bulk_load_status::BLS_DOWNLOADED; + _pstatus_map[3] = bulk_load_status::BLS_FAILED; + } else { + mock_pstatus_map(bulk_load_status::BLS_FAILED, test.end_index, 0); + } + try_to_continue_bulk_load(bulk_load_status::BLS_FAILED); + ASSERT_EQ(app_is_bulk_loading(SYNC_APP_NAME), test.expected_is_bulk_loading); + if (test.expected_is_bulk_loading) { + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_FAILED); + ASSERT_EQ(get_app_in_process_count(SYNC_APP_ID), SYNC_PARTITION_COUNT); + } + TearDown(); + } } -// app:paused, partition[0~2]=paused, partition[3]=pausing -TEST_F(bulk_load_failover_test, paused_with_wrong_status) +TEST_F(bulk_load_failover_test, app_cancel_test) { - mock_pstatus_map(bulk_load_status::BLS_PAUSED, 2); - _pstatus_map[3] = bulk_load_status::BLS_PAUSING; - try_to_continue_bulk_load(bulk_load_status::BLS_PAUSED); - ASSERT_FALSE(app_is_bulk_loading(SYNC_APP_NAME)); + // Test cases: + // - partition[0~2]=pausing, partition[3] not existed + // - partition[0~3]=cancel + // - partition[0~2]=ingestion, partition[3]=downloaded + struct app_cancel_test + { + bulk_load_status::type pstatus; + bulk_load_status::type p3_status; + bool expected_is_bulk_loading; + } tests[] = { + {bulk_load_status::type::BLS_PAUSING, bulk_load_status::type::BLS_INVALID, false}, + {bulk_load_status::type::BLS_CANCELED, bulk_load_status::type::BLS_CANCELED, true}, + {bulk_load_status::type::BLS_INGESTING, bulk_load_status::type::BLS_DOWNLOADED, true}}; + for (const auto &test : tests) { + SetUp(); + mock_pstatus_map(test.pstatus, 2, 0); + if (test.p3_status != bulk_load_status::type::BLS_INVALID) { + _pstatus_map[3] = test.p3_status; + } + try_to_continue_bulk_load(bulk_load_status::BLS_CANCELED); + ASSERT_EQ(app_is_bulk_loading(SYNC_APP_NAME), test.expected_is_bulk_loading); + if (test.expected_is_bulk_loading) { + ASSERT_EQ(get_app_bulk_load_status(SYNC_APP_ID), bulk_load_status::BLS_CANCELED); + ASSERT_EQ(get_app_in_process_count(SYNC_APP_ID), SYNC_PARTITION_COUNT); + } + TearDown(); + } } -// TODO(heyuchen): add other tests - } // namespace replication } // namespace dsn