Skip to content

Commit

Permalink
feat(bulk-load): handle recoverable errors during bulk load (#535)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Jul 9, 2020
1 parent 4a20c17 commit f14e7e6
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
5 changes: 3 additions & 2 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name,

switch (meta_status) {
case bulk_load_status::BLS_DOWNLOADING:
// TODO(heyuchen): add restart downloading status check
if (local_status == bulk_load_status::BLS_INVALID ||
local_status == bulk_load_status::BLS_PAUSED) {
local_status == bulk_load_status::BLS_PAUSED ||
local_status == bulk_load_status::BLS_INGESTING ||
local_status == bulk_load_status::BLS_SUCCEED) {
ec = start_download(app_name, cluster_name, provider_name);
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ class replica_bulk_loader_test : public replica_test_base
case bulk_load_status::BLS_PAUSED:
mock_group_progress(bulk_load_status::BLS_DOWNLOADING, 30, 100, 100);
break;
case bulk_load_status::BLS_INGESTING:
mock_group_ingestion_states(
ingestion_status::IS_SUCCEED, ingestion_status::IS_SUCCEED, true);
break;
case bulk_load_status::BLS_SUCCEED:
mock_group_cleanup_flag(bulk_load_status::BLS_SUCCEED);
break;
default:
return;
}
Expand Down Expand Up @@ -481,12 +488,12 @@ TEST_F(replica_bulk_loader_test, start_downloading_test)
TEST_F(replica_bulk_loader_test, rollback_to_downloading_test)
{
fail::cfg("replica_bulk_loader_download_sst_files", "return()");

// TODO(heyuchen): add other status
struct test_struct
{
bulk_load_status::type status;
} tests[]{{bulk_load_status::BLS_PAUSED}};
} tests[]{{bulk_load_status::BLS_PAUSED},
{bulk_load_status::BLS_INGESTING},
{bulk_load_status::BLS_SUCCEED}};

for (auto test : tests) {
test_rollback_to_downloading(test.status);
Expand Down
15 changes: 13 additions & 2 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,19 @@ void bulk_load_service::handle_app_pausing(const bulk_load_response &response,
// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, const gpid &pid)
{
// TODO(heyuchen): TBD
// replica meets error during bulk load, rollback to downloading to retry bulk load
zauto_read_lock l(_lock);
const auto app_status = get_app_bulk_load_status_unlocked(pid.get_app_id());
if (app_status == bulk_load_status::BLS_DOWNLOADING ||
app_status == bulk_load_status::BLS_DOWNLOADED ||
app_status == bulk_load_status::BLS_INGESTING ||
app_status == bulk_load_status::BLS_SUCCEED) {
update_app_status_on_remote_storage_unlocked(pid.get_app_id(),
bulk_load_status::type::BLS_DOWNLOADING);
} else {
ddebug_f("app({}) status={}, no need to rollback to downloading, wait for next round",
app_name,
dsn::enum_to_string(app_status));
}
}

// ThreadPool: THREAD_POOL_META_STATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ class bulk_load_service_test : public meta_test_base
bulk_svc().reset_local_bulk_load_states(app_id, app_name);
}

int32_t get_app_in_process_count(int32_t app_id)
{
return bulk_svc()._apps_in_progress_count[app_id];
}

public:
int32_t APP_ID = 1;
std::string APP_NAME = "bulk_load_test";
Expand Down Expand Up @@ -479,11 +484,39 @@ TEST_F(bulk_load_process_test, pause_succeed)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_PAUSED);
}

// TODO(heyuchen): add other unit tests for `on_partition_bulk_load_reply`
TEST_F(bulk_load_process_test, rpc_error)
{
mock_meta_bulk_load_context(_app_id, _partition_count, bulk_load_status::BLS_DOWNLOADED);
create_request(bulk_load_status::BLS_DOWNLOADED);
on_partition_bulk_load_reply(ERR_TIMEOUT, _req, _resp);
wait_all();
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING);
ASSERT_EQ(get_app_in_process_count(_app_id), _partition_count);
}

TEST_F(bulk_load_process_test, response_invalid_state)
{
test_on_partition_bulk_load_reply(
_partition_count, bulk_load_status::BLS_SUCCEED, ERR_INVALID_STATE);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING);
ASSERT_EQ(get_app_in_process_count(_app_id), _partition_count);
}

TEST_F(bulk_load_process_test, response_object_not_found)
{
test_on_partition_bulk_load_reply(
_partition_count, bulk_load_status::BLS_CANCELED, ERR_OBJECT_NOT_FOUND);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_CANCELED);
ASSERT_EQ(get_app_in_process_count(_app_id), _partition_count);
}

/// on_partition_ingestion_reply unit tests
// TODO(heyuchen):
// add ingest_rpc_error unit tests after implement function `rollback_downloading`
TEST_F(bulk_load_process_test, ingest_rpc_error)
{
mock_ingestion_context(ERR_OK, 1, _partition_count);
test_on_partition_ingestion_reply(_ingestion_resp, gpid(_app_id, _pidx), ERR_TIMEOUT);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING);
}

TEST_F(bulk_load_process_test, ingest_empty_write_error)
{
Expand Down

0 comments on commit f14e7e6

Please sign in to comment.