diff --git a/src/api/include/pdc_client_connect.h b/src/api/include/pdc_client_connect.h index acf8fa7f0..52208588b 100644 --- a/src/api/include/pdc_client_connect.h +++ b/src/api/include/pdc_client_connect.h @@ -1102,4 +1102,7 @@ perr_t PDC_Client_insert_obj_ref_into_dart(dart_hash_algo_t hash_algo, char *att */ void report_avg_server_profiling_rst(); +perr_t PDC_Client_transfer_pthread_create(); +perr_t PDC_Client_transfer_pthread_terminate(); +perr_t PDC_Client_transfer_pthread_cnt_add(int n); #endif /* PDC_CLIENT_CONNECT_H */ diff --git a/src/api/pdc_client_connect.c b/src/api/pdc_client_connect.c index b89006293..a6d05dc35 100644 --- a/src/api/pdc_client_connect.c +++ b/src/api/pdc_client_connect.c @@ -63,9 +63,10 @@ #include #include #include - #include "pdc_timing.h" +/* #define TANG_DEBUG 1 */ + int is_client_debug_g = 0; pdc_server_selection_t pdc_server_selection_g = PDC_SERVER_DEFAULT; int pdc_client_mpi_rank_g = 0; @@ -154,6 +155,7 @@ static hg_id_t data_server_write_check_register_id_g; static hg_id_t data_server_write_register_id_g; static hg_id_t server_checkpoint_rpc_register_id_g; static hg_id_t send_shm_register_id_g; +static hg_id_t send_rpc_register_id_g; // bulk static hg_id_t query_partial_register_id_g; @@ -188,7 +190,10 @@ int cache_count_g = 0; int cache_total_g = 0; pdc_data_server_io_list_t *client_cache_list_head_g = NULL; -static uint64_t object_selection_query_counter_g = 0; +static uint64_t object_selection_query_counter_g = 0; +static pthread_t hg_progress_tid_g; +static int hg_progress_flag_g = -1; // -1 thread unintialized, 0 thread created, 1 terminate thread +static int hg_progress_task_cnt_g = 0; /* * @@ -251,6 +256,108 @@ pdc_client_check_int_ret_cb(const struct hg_cb_info *callback_info) FUNC_LEAVE(ret_value); } +static void * +hg_progress_fn(void *foo) +{ + hg_return_t ret; + unsigned int actual_count; + hg_context_t *hg_context = (hg_context_t *)foo; + +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("[%s] enter %s \n", cur_time, __func__); +#endif + + while (hg_progress_flag_g != 1) { + do { + /* PDC_get_time_str(cur_time); */ + /* printf("[%s] before HG_Trigger\n", cur_time); */ + + ret = HG_Trigger(hg_context, 0, 1, &actual_count); + + /* PDC_get_time_str(cur_time); */ + /* printf("[%s] after HG_Trigger\n", cur_time); */ + } while ((ret == HG_SUCCESS) && actual_count && hg_progress_flag_g != 1); + + /* PDC_get_time_str(cur_time); */ + /* printf("[%s] before HG_Progress\n", cur_time); */ + + if (hg_progress_flag_g != 1) + HG_Progress(hg_context, 100); + + usleep(1000); + /* PDC_get_time_str(cur_time); */ + /* printf("[%s] after HG_Progress\n", cur_time); */ + } + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("[%s] leaving %s\n", cur_time, __func__); +#endif + + return (NULL); +} + +perr_t +PDC_Client_transfer_pthread_create() +{ + perr_t ret_value = SUCCEED; + + FUNC_ENTER(NULL); + + if (hg_progress_flag_g == -1) { + pthread_create(&hg_progress_tid_g, NULL, hg_progress_fn, send_context_g); + hg_progress_flag_g = 0; +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s %s created pthread\n", cur_time, __func__); +#endif + } + + FUNC_LEAVE(ret_value); +} + +perr_t +PDC_Client_transfer_pthread_terminate() +{ + perr_t ret_value = SUCCEED; + + FUNC_ENTER(NULL); + + if (hg_progress_flag_g == 0 && hg_progress_task_cnt_g == 0) { + hg_progress_flag_g = 1; + pthread_join(hg_progress_tid_g, NULL); + hg_progress_flag_g = -1; +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s %s terminated pthread\n", cur_time, __func__); +#endif + } + + FUNC_LEAVE(ret_value); +} + +perr_t +PDC_Client_transfer_pthread_cnt_add(int n) +{ + perr_t ret_value = SUCCEED; + + FUNC_ENTER(NULL); + +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s %s change cnt from %d to %d\n", cur_time, __func__, hg_progress_task_cnt_g, + hg_progress_task_cnt_g + n); +#endif + hg_progress_task_cnt_g += n; + + FUNC_LEAVE(ret_value); +} + // Check if all work has been processed // Using global variable $atomic_work_todo_g perr_t @@ -271,7 +378,7 @@ PDC_Client_check_response(hg_context_t **hg_context) if (hg_atomic_get32(&atomic_work_todo_g) <= 0) break; - hg_ret = HG_Progress(*hg_context, HG_MAX_IDLE_TIME); + hg_ret = HG_Progress(*hg_context, 100); } while (hg_ret == HG_SUCCESS || hg_ret == HG_TIMEOUT); ret_value = SUCCEED; @@ -279,6 +386,15 @@ PDC_Client_check_response(hg_context_t **hg_context) FUNC_LEAVE(ret_value); } +// Block and wait for all work processed by pthread +inline static void +PDC_Client_wait_pthread_progress() +{ + while (hg_atomic_get32(&atomic_work_todo_g) > 0) { + usleep(1000); + } +} + perr_t PDC_Client_read_server_addr_from_file() { @@ -550,19 +666,30 @@ client_send_transfer_request_all_rpc_cb(const struct hg_cb_info *callback_info) FUNC_ENTER(NULL); +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s PDC_CLIENT[%d] enter %s\n", cur_time, pdc_client_mpi_rank_g, __func__); +#endif + region_transfer_args = (struct _pdc_transfer_request_all_args *)callback_info->arg; handle = callback_info->info.forward.handle; ret_value = HG_Get_output(handle, &output); if (ret_value != HG_SUCCESS) { - printf("PDC_CLIENT[%d]: client_send_transfer_request_all_rpc_cb error with HG_Get_output\n", - pdc_client_mpi_rank_g); + printf("PDC_CLIENT[%d]: %s error with HG_Get_output\n", pdc_client_mpi_rank_g, __func__); region_transfer_args->ret = -1; goto done; } region_transfer_args->ret = output.ret; region_transfer_args->metadata_id = output.metadata_id; + + /* // Tang */ + /* region_transfer_args->ret = 1; */ + /* printf("PDC_CLIENT[%d]: %s ret %d meta id %llu\n", */ + /* pdc_client_mpi_rank_g, __func__, output.ret, output.metadata_id); */ + done: fflush(stdout); hg_atomic_decr32(&atomic_work_todo_g); @@ -615,8 +742,7 @@ client_send_transfer_request_rpc_cb(const struct hg_cb_info *callback_info) ret_value = HG_Get_output(handle, &output); if (ret_value != HG_SUCCESS) { - printf("PDC_CLIENT[%d]: client_send_transfer_request_rpc_cb error with HG_Get_output\n", - pdc_client_mpi_rank_g); + printf("PDC_CLIENT[%d]: %s error with HG_Get_output\n", __func__, pdc_client_mpi_rank_g); region_transfer_args->ret = -1; goto done; } @@ -904,6 +1030,76 @@ PDC_Client_try_lookup_server(int server_id, int is_init) FUNC_LEAVE(ret_value); } +static hg_return_t +send_rpc_cb(const struct hg_cb_info *callback_info) +{ + FUNC_ENTER(NULL); + + hg_return_t ret_value; + + struct _pdc_client_lookup_args *client_lookup_args = (struct _pdc_client_lookup_args *)callback_info->arg; + hg_handle_t handle = callback_info->info.forward.handle; + + /* Get output from server*/ + metadata_add_tag_out_t output; + ret_value = HG_Get_output(handle, &output); + if (ret_value != HG_SUCCESS) { + client_lookup_args->ret = -1; + PGOTO_ERROR(HG_OTHER_ERROR, "==PDC_CLIENT[%d]: metadata_add_tag_rpc_cb error with HG_Get_output", + pdc_client_mpi_rank_g); + } + client_lookup_args->ret = output.ret; + +done: + fflush(stdout); + hg_atomic_decr32(&atomic_work_todo_g); + HG_Free_output(handle, &output); + + FUNC_LEAVE(ret_value); +} + +perr_t +PDC_Client_send_rpc(int server_id) +{ + perr_t ret_value = SUCCEED; + hg_return_t hg_ret = 0; + struct _pdc_client_lookup_args lookup_args; + hg_handle_t handle; + send_rpc_in_t in; + + FUNC_ENTER(NULL); + + if (server_id < 0 || server_id >= pdc_server_num_g) + PGOTO_ERROR(FAIL, "==PDC_CLIENT[%d]: %s invalid server ID %d", pdc_client_mpi_rank_g, __func__, + server_id); + + // Debug statistics for counting number of messages sent to each server. + debug_server_id_count[server_id]++; + + if (PDC_Client_try_lookup_server(server_id, 0) != SUCCEED) + PGOTO_ERROR(FAIL, "==CLIENT[%d]: ERROR with PDC_Client_try_lookup_server", pdc_client_mpi_rank_g); + + HG_Create(send_context_g, pdc_server_info_g[server_id].addr, send_rpc_register_id_g, &handle); + + in.value = pdc_client_mpi_rank_g; + + hg_ret = HG_Forward(handle, send_rpc_cb, &lookup_args, &in); + if (hg_ret != HG_SUCCESS) + PGOTO_ERROR(FAIL, "==PDC_CLIENT[%d]: - HG_Forward Error!", pdc_client_mpi_rank_g); + + // No need to wait + // Wait for response from server + hg_atomic_set32(&atomic_work_todo_g, 1); + + PDC_Client_check_response(&send_context_g); + +done: + fflush(stdout); + HG_Destroy(handle); + + FUNC_LEAVE(ret_value); +} + // Callback function for HG_Forward() // Gets executed after a call to HG_Trigger and the RPC has completed static hg_return_t @@ -1129,7 +1325,7 @@ PDC_Client_check_bulk(hg_context_t *hg_context) /* Do not try to make progress anymore if we're done */ if (hg_atomic_get32(&bulk_todo_g) <= 0) break; - hg_ret = HG_Progress(hg_context, HG_MAX_IDLE_TIME); + hg_ret = HG_Progress(hg_context, 100); } while (hg_ret == HG_SUCCESS || hg_ret == HG_TIMEOUT); @@ -1291,6 +1487,7 @@ PDC_Client_mercury_init(hg_class_t **hg_class, hg_context_t **hg_context, int po data_server_write_register_id_g = PDC_data_server_write_register(*hg_class); server_checkpoint_rpc_register_id_g = PDC_server_checkpoint_rpc_register(*hg_class); send_shm_register_id_g = PDC_send_shm_register(*hg_class); + send_rpc_register_id_g = PDC_send_rpc_register(*hg_class); // bulk query_partial_register_id_g = PDC_query_partial_register(*hg_class); @@ -1504,6 +1701,11 @@ PDC_Client_init() server_mem_usage_g = (int64_t *)calloc(pdc_server_num_g, sizeof(int64_t)); } + /* if (hg_progress_flag_g == -1) { */ + /* hg_progress_flag_g = 0; */ + /* pthread_create(&hg_progress_tid_g, NULL, hg_progress_fn, send_context_g); */ + /* } */ + done: fflush(stdout); FUNC_LEAVE(ret_value); @@ -1550,6 +1752,13 @@ PDC_Client_finalize() if (pdc_server_info_g != NULL) free(pdc_server_info_g); + // Terminate thread + if (hg_progress_flag_g == 0) { + hg_progress_flag_g = 1; + pthread_join(hg_progress_tid_g, NULL); + hg_progress_flag_g = -1; + } + #ifndef ENABLE_MPI for (i = 0; i < pdc_server_num_g; i++) { printf(" Server%3d, %d\n", i, debug_server_id_count[i]); @@ -3014,20 +3223,28 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d int i; hg_handle_t client_send_transfer_request_all_handle; struct _pdc_transfer_request_all_args transfer_args; + char cur_time[64]; FUNC_ENTER(NULL); + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s PDC_CLIENT[%d] enter %s\n", cur_time, pdc_client_mpi_rank_g, __func__); +#endif + #ifdef PDC_TIMING double start = MPI_Wtime(), end; double function_start = start; #endif if (!(access_type == PDC_WRITE || access_type == PDC_READ)) { ret_value = FAIL; - printf("Invalid PDC type in function PDC_Client_transfer_request_all @ %d\n", __LINE__); + printf("Invalid PDC type in function %s @ %d\n", __func__, __LINE__); goto done; } in.n_objs = n_objs; in.access_type = access_type; in.total_buf_size = bulk_size; + in.client_id = pdc_client_mpi_rank_g; // Compute metadata server id // meta_server_id = PDC_get_server_by_obj_id(obj_id[0], pdc_server_num_g); @@ -3047,12 +3264,25 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d hg_ret = HG_Bulk_create(hg_class, 1, (void **)&bulk_buf, &bulk_size, HG_BULK_READWRITE, &(in.local_bulk_handle)); if (hg_ret != HG_SUCCESS) - PGOTO_ERROR(FAIL, - "PDC_Client_transfer_request_all(): Could not create local bulk data handle @ line %d\n", - __LINE__); + PGOTO_ERROR(FAIL, "%s: Could not create local bulk data handle @ line %d\n", __func__, __LINE__); + + hg_atomic_set32(&atomic_work_todo_g, 1); hg_ret = HG_Forward(client_send_transfer_request_all_handle, client_send_transfer_request_all_rpc_cb, &transfer_args, &in); + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s PDC_CLIENT[%d] %s: forwarded to %d\n", cur_time, pdc_client_mpi_rank_g, __func__, + data_server_id); +#endif + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); +#endif + + PDC_Client_transfer_pthread_create(); + #ifdef PDC_TIMING if (access_type == PDC_READ) { pdc_timings.PDCtransfer_request_start_all_read_rpc += MPI_Wtime() - start; @@ -3066,8 +3296,24 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d if (hg_ret != HG_SUCCESS) PGOTO_ERROR(FAIL, "PDC_Client_send_transfer_request_all(): Could not start HG_Forward() @ line %d\n", __LINE__); - hg_atomic_set32(&atomic_work_todo_g, 1); - PDC_Client_check_response(&send_context_g); + + /* if (hg_progress_flag_g == -1) { */ + /* pthread_create(&hg_progress_tid_g, NULL, hg_progress_fn, send_context_g); */ + /* hg_progress_flag_g = 0; */ + /* } */ + + /* PDC_Client_check_response(&send_context_g); */ + + PDC_Client_wait_pthread_progress(); + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s PDC_CLIENT[%d] %s: received response\n", cur_time, pdc_client_mpi_rank_g, __func__); +#endif + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); +#endif #ifdef PDC_TIMING end = MPI_Wtime(); @@ -3081,8 +3327,7 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d } #endif /* - printf("PDC_Client_transfer_request() checkpoint, first value is %d @ line %d\n", ((int *)buf)[0], - __LINE__); + printf("%s checkpoint, first value is %d @ line %d\n", __func__,((int *)buf)[0], __LINE__); */ for (i = 0; i < n_objs; ++i) { metadata_id[i] = transfer_args.metadata_id + i; @@ -3091,6 +3336,12 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d PGOTO_ERROR(FAIL, "PDC_CLIENT: transfer request failed... @ line %d\n", __LINE__); HG_Destroy(client_send_transfer_request_all_handle); + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s PDC_CLIENT[%d] leave %s\n", cur_time, pdc_client_mpi_rank_g, __func__); +#endif + done: fflush(stdout); FUNC_LEAVE(ret_value); @@ -3116,8 +3367,8 @@ PDC_Client_transfer_request_metadata_query2(char *buf, uint64_t total_buf_size, in.total_buf_size = total_buf_size; // Compute metadata server id - // fprintf(stderr, "PDC_Client_transfer_request_metadata_query2[%d]: metdata_id = %u, total_buf_size = - // %lu\n", pdc_client_mpi_rank_g, metadata_server_id, total_buf_size); + // fprintf(stderr, "%s [%d]: metdata_id = %u, total_buf_size = + // %lu\n", __func__, pdc_client_mpi_rank_g, metadata_server_id, total_buf_size); debug_server_id_count[metadata_server_id]++; hg_class = HG_Context_get_class(send_context_g); @@ -3134,10 +3385,7 @@ PDC_Client_transfer_request_metadata_query2(char *buf, uint64_t total_buf_size, hg_ret = HG_Bulk_create(hg_class, 1, (void **)&buf, (hg_size_t *)&(in.total_buf_size), HG_BULK_READWRITE, &(in.local_bulk_handle)); if (hg_ret != HG_SUCCESS) - PGOTO_ERROR(FAIL, - "PDC_Client_transfer_request_metadata_query2(): Could not create local bulk data handle " - "@ line %d\n", - __LINE__); + PGOTO_ERROR(FAIL, "%s: Could not create local bulk data handle @ line %d\n", __func__, __LINE__); hg_ret = HG_Forward(client_send_transfer_request_metadata_query2_handle, client_send_transfer_request_metadata_query2_rpc_cb, &transfer_args, &in); @@ -3205,10 +3453,7 @@ PDC_Client_transfer_request_metadata_query(char *buf, uint64_t total_buf_size, i hg_ret = HG_Bulk_create(hg_class, 1, (void **)&buf, (hg_size_t *)&(in.total_buf_size), HG_BULK_READWRITE, &(in.local_bulk_handle)); if (hg_ret != HG_SUCCESS) - PGOTO_ERROR(FAIL, - "PDC_Client_transfer_request_metadata_query(): Could not create local bulk data handle @ " - "line %d\n", - __LINE__); + PGOTO_ERROR(FAIL, "%s: Could not create local bulk data handle @ line %d\n", __func__, __LINE__); hg_ret = HG_Forward(client_send_transfer_request_metadata_query_handle, client_send_transfer_request_metadata_query_rpc_cb, &transfer_args, &in); @@ -3233,7 +3478,7 @@ PDC_Client_transfer_request_metadata_query(char *buf, uint64_t total_buf_size, i pdc_timings.PDCtransfer_request_metadata_query_rpc += end - start; pdc_timestamp_register(pdc_client_transfer_request_metadata_query_timestamps, function_start, end); #endif - // fprintf(stderr, "PDC_Client_transfer_request_metadata_query: checkpoint %d\n", __LINE__); + // fprintf(stderr, "%s: checkpoint %d\n", __func__, __LINE__); done: fflush(stdout); @@ -3249,8 +3494,16 @@ PDC_Client_transfer_request_wait_all(int n_objs, pdcid_t *transfer_request_id, u hg_class_t * hg_class; hg_handle_t client_send_transfer_request_wait_all_handle; struct _pdc_transfer_request_wait_all_args transfer_args; + char cur_time[64]; FUNC_ENTER(NULL); + + /* if (hg_progress_flag_g == 0) { */ + /* hg_progress_flag_g = 1; */ + /* pthread_join(hg_progress_tid_g, NULL); */ + /* hg_progress_flag_g = -1; */ + /* } */ + #ifdef PDC_TIMING double start = MPI_Wtime(), end; double function_start = start; @@ -3276,10 +3529,7 @@ PDC_Client_transfer_request_wait_all(int n_objs, pdcid_t *transfer_request_id, u hg_ret = HG_Bulk_create(hg_class, 1, (void **)&transfer_request_id, (hg_size_t *)&(in.total_buf_size), HG_BULK_READWRITE, &(in.local_bulk_handle)); if (hg_ret != HG_SUCCESS) - PGOTO_ERROR( - FAIL, - "PDC_Client_transfer_request_wait_all(): Could not create local bulk data handle @ line %d\n", - __LINE__); + PGOTO_ERROR(FAIL, "%s: Could not create local bulk data handle @ line %d\n", __func__, __LINE__); hg_ret = HG_Forward(client_send_transfer_request_wait_all_handle, client_send_transfer_request_wait_all_rpc_cb, &transfer_args, &in); @@ -3297,9 +3547,7 @@ PDC_Client_transfer_request_wait_all(int n_objs, pdcid_t *transfer_request_id, u /* - printf("PDC_Client_transfer_request() checkpoint, first value is %d @ line %d\n", ((int *)buf)[0], - - __LINE__); + printf("%s checkpoint, first value is %d @ line %d\n", ((int *)buf)[0], __func__, __LINE__); */ if (transfer_args.ret != 1) PGOTO_ERROR(FAIL, "PDC_CLIENT: transfer request wait all failed... @ line %d\n", __LINE__); @@ -3332,6 +3580,7 @@ PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_server_id, int i; hg_handle_t client_send_transfer_request_handle; struct _pdc_transfer_request_args transfer_args; + char cur_time[64]; FUNC_ENTER(NULL); #ifdef PDC_TIMING @@ -3340,11 +3589,11 @@ PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_server_id, #endif if (!(access_type == PDC_WRITE || access_type == PDC_READ)) { ret_value = FAIL; - printf("Invalid PDC type in function PDC_Client_transfer_request @ %d\n", __LINE__); + printf("Invalid PDC type in function %s @ %d\n", __func__, __LINE__); goto done; } - // printf("rank = %d, PDC_Client_transfer_request_start data_server_id = %u\n", pdc_client_mpi_rank_g, + // printf("rank = %d, %s data_server_id = %u\n", pdc_client_mpi_rank_g, __func__, // data_server_id); in.access_type = access_type; in.remote_unit = unit; @@ -3388,9 +3637,9 @@ PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_server_id, &(in.local_bulk_handle)); if (hg_ret != HG_SUCCESS) - PGOTO_ERROR(FAIL, - "PDC_Client_transfer_request(): Could not create local bulk data handle @ line %d\n", - __LINE__); + PGOTO_ERROR(FAIL, "%s: Could not create local bulk data handle @ line %d\n", __func__, __LINE__); + + hg_atomic_set32(&atomic_work_todo_g, 1); hg_ret = HG_Forward(client_send_transfer_request_handle, client_send_transfer_request_rpc_cb, &transfer_args, &in); @@ -3405,11 +3654,20 @@ PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_server_id, start = MPI_Wtime(); #endif + PDC_Client_transfer_pthread_create(); + if (hg_ret != HG_SUCCESS) PGOTO_ERROR(FAIL, "PDC_Client_send_transfer_request(): Could not start HG_Forward() @ line %d\n", __LINE__); - hg_atomic_set32(&atomic_work_todo_g, 1); - PDC_Client_check_response(&send_context_g); + /* hg_atomic_set32(&atomic_work_todo_g, 1); */ + /* PDC_Client_check_response(&send_context_g); */ + + /* if (hg_progress_flag_g == -1) { */ + /* pthread_create(&hg_progress_tid_g, NULL, hg_progress_fn, send_context_g); */ + /* hg_progress_flag_g = 0; */ + /* } */ + + PDC_Client_wait_pthread_progress(); #ifdef PDC_TIMING end = MPI_Wtime(); @@ -3454,9 +3712,7 @@ PDC_Client_transfer_request_status(pdcid_t transfer_request_id, uint32_t data_se transfer_request_status_register_id_g, &client_send_transfer_request_status_handle); if (hg_ret != HG_SUCCESS) - PGOTO_ERROR(FAIL, - "PDC_Client_transfer_request(): Could not create local bulk data handle @ line %d\n", - __LINE__); + PGOTO_ERROR(FAIL, "%s: Could not create local bulk data handle @ line %d\n", __func__, __LINE__); hg_ret = HG_Forward(client_send_transfer_request_status_handle, client_send_transfer_request_status_rpc_cb, &transfer_args, &in); @@ -3486,8 +3742,17 @@ PDC_Client_transfer_request_wait(pdcid_t transfer_request_id, uint32_t data_serv transfer_request_wait_in_t in; hg_handle_t client_send_transfer_request_wait_handle; struct _pdc_transfer_request_wait_args transfer_args; + char cur_time[64]; FUNC_ENTER(NULL); + + // Join the thread of trasfer start + /* if (hg_progress_flag_g == 0) { */ + /* hg_progress_flag_g = 1; */ + /* pthread_join(hg_progress_tid_g, NULL); */ + /* hg_progress_flag_g = -1; */ + /* } */ + #ifdef PDC_TIMING double start = MPI_Wtime(), end; double function_start = start; @@ -3506,9 +3771,7 @@ PDC_Client_transfer_request_wait(pdcid_t transfer_request_id, uint32_t data_serv transfer_request_wait_register_id_g, &client_send_transfer_request_wait_handle); if (hg_ret != HG_SUCCESS) - PGOTO_ERROR(FAIL, - "PDC_Client_transfer_request(): Could not create local bulk data handle @ line %d\n", - __LINE__); + PGOTO_ERROR(FAIL, "%s: Could not create local bulk data handle @ line %d\n", __func__, __LINE__); hg_ret = HG_Forward(client_send_transfer_request_wait_handle, client_send_transfer_request_wait_rpc_cb, &transfer_args, &in); diff --git a/src/api/pdc_region/pdc_region_transfer.c b/src/api/pdc_region/pdc_region_transfer.c index ee4e48c8b..2d2cacf5b 100644 --- a/src/api/pdc_region/pdc_region_transfer.c +++ b/src/api/pdc_region/pdc_region_transfer.c @@ -44,6 +44,8 @@ #include "pdc_analysis_pkg.h" #include +/* #define TANG_DEBUG 1 */ + // pdc region transfer class. Contains essential information for performing non-blocking PDC client I/O // perations. typedef struct pdc_transfer_request { @@ -306,7 +308,11 @@ PDCregion_transfer_close(pdcid_t transfer_request_id) /* When the reference count reaches zero the resources are freed */ if (PDC_dec_ref(transfer_request_id) < 0) PGOTO_ERROR(FAIL, "PDC transfer request: problem of freeing id"); + done: + PDC_Client_transfer_pthread_cnt_add(-1); + PDC_Client_transfer_pthread_terminate(); + fflush(stdout); FUNC_LEAVE(ret_value); } @@ -931,9 +937,8 @@ prepare_start_all_requests(pdcid_t *transfer_request_id, int size, transferinfo = PDC_find_id(transfer_request_id[i]); transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); if (transfer_request->metadata_id != NULL) { - printf("PDC Client PDCregion_transfer_start_all attempt to start existing transfer request @ " - "line %d\n", - __LINE__); + printf("PDC Client %d: %s attempt to start existing transfer request @ line %d\n", + pdc_client_mpi_rank_g, __func__, __LINE__); return FAIL; } if (transfer_request->consistency == PDC_CONSISTENCY_POSIX) { @@ -960,7 +965,7 @@ prepare_start_all_requests(pdcid_t *transfer_request_id, int size, &(transfer_request->sub_offsets), &(transfer_request->output_offsets), &(transfer_request->output_sizes), &(transfer_request->output_buf)); if (transfer_request->n_obj_servers == 0) { - printf("PDC_Client %d, %s: error with static region partition, no server is selected!\n", + printf("PDC_Client %d: %s error with static region partition, no server is selected!\n", pdc_client_mpi_rank_g, __func__); return FAIL; } @@ -1070,6 +1075,7 @@ prepare_start_all_requests(pdcid_t *transfer_request_id, int size, else { *write_size_ptr = 0; } + if (read_size) { read_transfer_request = (pdc_transfer_request_start_all_pkg **)malloc( sizeof(pdc_transfer_request_start_all_pkg *) * read_size); @@ -1239,6 +1245,16 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ int * bulk_buf_ref; FUNC_ENTER(NULL); + +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s PDC_CLIENT[%d] enter %s\n", cur_time, pdc_client_mpi_rank_g, __func__); +#endif + + if (size == 0) + goto done; + metadata_id = (uint64_t *)malloc(sizeof(uint64_t) * size); read_bulk_buf = (char **)malloc(sizeof(char *) * size); index = 0; @@ -1253,6 +1269,8 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ bulk_buf_ref[0] = n_objs; // printf("checkpoint @ line %d, index = %d, dataserver_id = %d, n_objs = %d\n", __LINE__, index, // transfer_requests[index]->data_server_id, n_objs); + /* printf("==PDC_CLIENT[%d]: %s 1 send to server %d\n", pdc_client_mpi_rank_g, __func__, + * transfer_requests[index]->data_server_id); */ PDC_Client_transfer_request_all(n_objs, transfer_requests[index]->transfer_request->access_type, transfer_requests[index]->data_server_id, bulk_buf, bulk_buf_size, metadata_id + index); @@ -1288,9 +1306,12 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ bulk_buf_ref[0] = n_objs; // printf("checkpoint @ line %d, index = %d, dataserver_id = %d, n_objs = %d\n", __LINE__, index, // transfer_requests[index]->data_server_id, n_objs); + /* printf("==PDC_CLIENT[%d]: %s 2 send to server %d\n", pdc_client_mpi_rank_g, __func__, + * transfer_requests[index]->data_server_id); */ PDC_Client_transfer_request_all(n_objs, transfer_requests[index]->transfer_request->access_type, transfer_requests[index]->data_server_id, bulk_buf, bulk_buf_size, metadata_id + index); + // printf("transfer request towards data server %d\n", transfer_requests[index]->data_server_id); for (j = index; j < size; ++j) { // All requests share the same bulk buffer, reference counter is also shared among all @@ -1308,9 +1329,11 @@ PDC_Client_start_all_requests(pdc_transfer_request_start_all_pkg **transfer_requ // transfer_requests[j]->transfer_request->obj_id, transfer_requests[j]->index, metadata_id[j]); } } + free(read_bulk_buf); free(metadata_id); +done: fflush(stdout); FUNC_LEAVE(ret_value); } @@ -1324,13 +1347,21 @@ PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size) pdcid_t * posix_transfer_request_id; FUNC_ENTER(NULL); + +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s PDC_CLIENT[%d] enter %s\n", cur_time, pdc_client_mpi_rank_g, __func__); +#endif + // Split write and read requests. Handle them separately. - // printf("PDCregion_transfer_start_all: checkpoint %d\n", __LINE__); + // printf("%s: checkpoint %d\n", __func__, __LINE__); + // [Tang] NOTE: prepare_start_all_requests include several metadata RPC operations ret_value = prepare_start_all_requests(transfer_request_id, size, &write_transfer_requests, &read_transfer_requests, &write_size, &read_size, &posix_transfer_request_id, &posix_size); /* - printf("PDCregion_transfer_start_all: checkpoint %d, write_size = %d, read_size = %d\n", __LINE__, + printf("%s: checkpoint %d, write_size = %d, read_size = %d\n", __func__, __LINE__, write_size, read_size); int i; for ( i = 0; i < read_size; ++i ) { @@ -1338,25 +1369,46 @@ PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size) read_transfer_requests[i]->data_server_id, read_transfer_requests[i]->transfer_request->obj_id); } */ + PDC_Client_transfer_pthread_cnt_add(size); + /* PDC_Client_transfer_pthread_create(); */ + +#ifdef ENABLE_MPI + // [Tang] TODO: change to user provided comm + /* MPI_Comm world_comm; */ + /* MPI_Comm_dup(MPI_COMM_WORLD, &world_comm); */ + /* MPI_Barrier(world_comm); */ + MPI_Barrier(MPI_COMM_WORLD); +#endif // Start write requests - PDC_Client_start_all_requests(write_transfer_requests, write_size); - // printf("PDCregion_transfer_start_all: checkpoint %d\n", __LINE__); + if (write_size > 0) + PDC_Client_start_all_requests(write_transfer_requests, write_size); + // printf("%s: checkpoint %d\n", __func__, __LINE__); // Start read requests - PDC_Client_start_all_requests(read_transfer_requests, read_size); + if (read_size > 0) + PDC_Client_start_all_requests(read_transfer_requests, read_size); /* - fprintf(stderr, "PDCregion_transfer_start_all: checkpoint %d\n", __LINE__); + fprintf(stderr, "%s: checkpoint %d\n", __func__, __LINE__); MPI_Barrier(MPI_COMM_WORLD); */ // For POSIX consistency, we block here until the data is received by the server - PDCregion_transfer_wait_all(posix_transfer_request_id, posix_size); - free(posix_transfer_request_id); + if (posix_size > 0) { + fprintf(stderr, "==PDC_CLIENT[%d]: %s wait for posix requests\n", pdc_client_mpi_rank_g, __func__); + PDCregion_transfer_wait_all(posix_transfer_request_id, posix_size); + free(posix_transfer_request_id); + } // Clean up memory finish_start_all_requests(write_transfer_requests, read_transfer_requests, write_size, read_size); - // fprintf(stderr, "PDCregion_transfer_start_all: checkpoint %d\n", __LINE__); - // MPI_Barrier(MPI_COMM_WORLD); + // fprintf(stderr, "%s: checkpoint %d\n", __func__, __LINE__); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + /* MPI_Barrier(world_comm); */ + /* MPI_Comm_free(&world_comm); */ +#endif + FUNC_LEAVE(ret_value); } @@ -1447,6 +1499,9 @@ PDCregion_transfer_start(pdcid_t transfer_request_id) goto done; } + PDC_Client_transfer_pthread_cnt_add(1); + /* PDC_Client_transfer_pthread_create(); */ + attach_local_transfer_request(transfer_request->obj_pointer, transfer_request_id); // Pack local region to a contiguous memory buffer @@ -1707,7 +1762,7 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) goto done; } - // printf("entered PDCregion_transfer_wait_all @ line %d\n", __LINE__); + // printf("entered %s @ line %d\n", __func__, __LINE__); total_requests = 0; transfer_request_head = NULL; for (i = 0; i < size; ++i) { @@ -1715,9 +1770,9 @@ PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size) transfer_request = (pdc_transfer_request *)(transferinfo->obj_ptr); if (!transfer_request->metadata_id) { fprintf(stderr, - "PDCregion_transfer_wait_all [rank %d] @ line %d: Attempt to wait for a transfer request " + "%s [rank %d] @ line %d: Attempt to wait for a transfer request " "that has not been started.\n", - pdc_client_mpi_rank_g, __LINE__); + __func__, pdc_client_mpi_rank_g, __LINE__); ret_value = FAIL; goto done; } diff --git a/src/commons/utils/include/pdc_timing.h b/src/commons/utils/include/pdc_timing.h index 22b39148f..fd409732e 100644 --- a/src/commons/utils/include/pdc_timing.h +++ b/src/commons/utils/include/pdc_timing.h @@ -192,4 +192,7 @@ int PDC_server_timing_report(); int PDC_timing_report(const char *prefix); #endif +extern int pdc_timing_rank_g; +void PDC_get_time_str(char *cur_time); +int PDC_get_rank(); #endif diff --git a/src/commons/utils/pdc_timing.c b/src/commons/utils/pdc_timing.c index 95826b56b..cc8600b41 100644 --- a/src/commons/utils/pdc_timing.c +++ b/src/commons/utils/pdc_timing.c @@ -1,4 +1,5 @@ #include "pdc_timing.h" +#include "assert.h" #ifdef PDC_TIMING static double pdc_base_time; @@ -534,4 +535,33 @@ PDC_timing_report(const char *prefix __attribute__((unused))) { return 0; } +#endif // PDC_TIMING + +int pdc_timing_rank_g = -1; + +inline int +PDC_get_rank() +{ +#ifdef ENABLE_MPI + if (pdc_timing_rank_g == -1) + MPI_Comm_rank(MPI_COMM_WORLD, &pdc_timing_rank_g); + return pdc_timing_rank_g; +#else + return 0; #endif +} + +inline void +PDC_get_time_str(char *cur_time) +{ + struct timespec ts; + + assert(cur_time); + + clock_gettime(CLOCK_REALTIME, &ts); + sprintf(cur_time, "%04d-%02d-%02d %02d:%02d:%02d.%06ld", 1900 + localtime(&ts.tv_sec)->tm_year, + localtime(&ts.tv_sec)->tm_mon + 1, localtime(&ts.tv_sec)->tm_mday, localtime(&ts.tv_sec)->tm_hour, + localtime(&ts.tv_sec)->tm_min, localtime(&ts.tv_sec)->tm_sec, ts.tv_nsec / 1000); + + return; +} diff --git a/src/server/include/pdc_client_server_common.h b/src/server/include/pdc_client_server_common.h index 98edcacdc..ddc07249e 100644 --- a/src/server/include/pdc_client_server_common.h +++ b/src/server/include/pdc_client_server_common.h @@ -46,6 +46,7 @@ hg_thread_mutex_t lock_list_mutex_g; hg_thread_mutex_t meta_buf_map_mutex_g; hg_thread_mutex_t meta_obj_map_mutex_g; #endif +extern struct timeval last_cache_activity_timeval_g; #define PAGE_SIZE 4096 #define ADDR_MAX 1024 @@ -454,6 +455,16 @@ typedef struct { pdc_metadata_transfer_t ret; } metadata_query_out_t; +/* Define send_rpc_in_t */ +typedef struct { + int value; +} send_rpc_in_t; + +/* Define send_rpc_out_t */ +typedef struct { + int value; +} send_rpc_out_t; + /* Define metadata_add_tag_in_t */ typedef struct { uint64_t obj_id; @@ -804,6 +815,7 @@ typedef struct transfer_request_all_in_t { uint64_t total_buf_size; int32_t n_objs; uint8_t access_type; + int client_id; } transfer_request_all_in_t; /* Define transfer_request_all_out_t */ @@ -1755,6 +1767,36 @@ hg_proc_pdc_metadata_transfer_t(hg_proc_t proc, void *data) return ret; } +/* Define hg_proc_send_rpc_in_t */ +static HG_INLINE hg_return_t +hg_proc_send_rpc_in_t(hg_proc_t proc, void *data) +{ + hg_return_t ret; + send_rpc_in_t *struct_data = (send_rpc_in_t *)data; + + ret = hg_proc_int32_t(proc, &struct_data->value); + if (ret != HG_SUCCESS) { + // HG_LOG_ERROR("Proc error"); + return ret; + } + return ret; +} + +/* Define hg_proc_send_rpc_out_t */ +static HG_INLINE hg_return_t +hg_proc_send_rpc_out_t(hg_proc_t proc, void *data) +{ + hg_return_t ret; + send_rpc_out_t *struct_data = (send_rpc_out_t *)data; + + ret = hg_proc_int32_t(proc, &struct_data->value); + if (ret != HG_SUCCESS) { + // HG_LOG_ERROR("Proc error"); + return ret; + } + return ret; +} + /* Define hg_proc_metadata_add_tag_in_t */ static HG_INLINE hg_return_t hg_proc_metadata_add_tag_in_t(hg_proc_t proc, void *data) @@ -2805,6 +2847,11 @@ hg_proc_transfer_request_all_in_t(hg_proc_t proc, void *data) // HG_LOG_ERROR("Proc error"); return ret; } + ret = hg_proc_int32_t(proc, &struct_data->client_id); + if (ret != HG_SUCCESS) { + // HG_LOG_ERROR("Proc error"); + return ret; + } return ret; } @@ -4222,6 +4269,7 @@ hg_id_t PDC_metadata_add_tag_register(hg_class_t *hg_class); hg_id_t PDC_metadata_add_kvtag_register(hg_class_t *hg_class); hg_id_t PDC_metadata_del_kvtag_register(hg_class_t *hg_class); hg_id_t PDC_metadata_get_kvtag_register(hg_class_t *hg_class); +hg_id_t PDC_send_rpc_register(hg_class_t *hg_class); hg_id_t PDC_transfer_request_register(hg_class_t *hg_class); hg_id_t PDC_transfer_request_all_register(hg_class_t *hg_class); diff --git a/src/server/pdc_client_server_common.c b/src/server/pdc_client_server_common.c index d90f83bff..37df71ab0 100644 --- a/src/server/pdc_client_server_common.c +++ b/src/server/pdc_client_server_common.c @@ -63,6 +63,8 @@ hg_thread_pool_t *hg_test_thread_pool_fs_g = NULL; uint64_t pdc_id_seq_g = PDC_SERVER_ID_INTERVEL; // actual value for each server is set by PDC_Server_init() +struct timeval last_cache_activity_timeval_g; + #include "pdc_server_region_request_handler.h" hg_return_t @@ -1851,6 +1853,28 @@ HG_TEST_RPC_CB(metadata_delete, handle) FUNC_LEAVE(ret_value); } +/* static hg_return_t */ +// send_rpc_cb(hg_handle_t handle) +HG_TEST_RPC_CB(send_rpc, handle) +{ + send_rpc_in_t in; + send_rpc_out_t out; + hg_return_t ret_value = HG_SUCCESS; + + FUNC_ENTER(NULL); + + HG_Get_input(handle, &in); + fprintf(stderr, "==PDC_Server[]: %s received value from client %d\n", __func__, in.value); + + out.value = 1; + HG_Respond(handle, NULL, NULL, &out); + + HG_Free_input(handle, &in); + HG_Destroy(handle); + + FUNC_LEAVE(ret_value); +} + /* static hg_return_t */ // metadata_add_tag_cb(hg_handle_t handle) HG_TEST_RPC_CB(metadata_add_tag, handle) @@ -1928,7 +1952,14 @@ HG_TEST_RPC_CB(metadata_add_kvtag, handle) FUNC_ENTER(NULL); HG_Get_input(handle, &in); - PDC_Server_add_kvtag(&in, &out); + if (strcmp(in.kvtag.name, "PDC_NOOP") != 0) { + PDC_Server_add_kvtag(&in, &out); + } + else { + printf("==PDC_SERVER[]: received NOOP\n"); + out.ret = 1; + } + ret_value = HG_Respond(handle, NULL, NULL, &out); HG_Free_input(handle, &in); @@ -2113,7 +2144,8 @@ HG_TEST_RPC_CB(close_server, handle) fflush(stdout); FUNC_LEAVE(ret_value); } -/* + +#ifdef ENABLE_MULTITHREAD static HG_THREAD_RETURN_TYPE pdc_region_write_out_progress(void *arg) { @@ -2169,9 +2201,8 @@ pdc_region_write_out_progress(void *arg) FUNC_LEAVE(ret_value); } -*/ + // enter this function, transfer is done, data is pushed to buffer -/* static hg_return_t obj_map_region_release_bulk_transfer_thread_cb(const struct hg_cb_info *hg_cb_info) { @@ -2181,8 +2212,6 @@ obj_map_region_release_bulk_transfer_thread_cb(const struct hg_cb_info *hg_cb_in FUNC_ENTER(NULL); - - bulk_args = (struct buf_map_release_bulk_args *)hg_cb_info->arg; if (hg_cb_info->ret == HG_CANCELED) { @@ -2216,9 +2245,7 @@ obj_map_region_release_bulk_transfer_thread_cb(const struct hg_cb_info *hg_cb_in FUNC_LEAVE(ret_value); } -*/ -/* static HG_THREAD_RETURN_TYPE pdc_region_read_from_progress(void *arg) { @@ -2268,7 +2295,8 @@ pdc_region_read_from_progress(void *arg) FUNC_LEAVE(ret_value); } -*/ +#endif + // enter this function, transfer is done, data is in data server static hg_return_t transform_and_region_release_bulk_transfer_cb(const struct hg_cb_info *hg_cb_info) @@ -6563,11 +6591,14 @@ HG_TEST_THREAD_CB(query_read_obj_name_client_rpc) HG_TEST_THREAD_CB(send_client_storage_meta_rpc) HG_TEST_THREAD_CB(send_shm_bulk_rpc) HG_TEST_THREAD_CB(send_data_query_rpc) +HG_TEST_THREAD_CB(send_rpc) HG_TEST_THREAD_CB(send_nhits) HG_TEST_THREAD_CB(send_bulk_rpc) HG_TEST_THREAD_CB(get_sel_data_rpc) HG_TEST_THREAD_CB(send_read_sel_obj_id_rpc) +HG_TEST_THREAD_CB(dart_get_server_info) +HG_TEST_THREAD_CB(dart_perform_one_server) PDC_FUNC_DECLARE_REGISTER(gen_obj_id) PDC_FUNC_DECLARE_REGISTER(gen_cont_id) @@ -6582,6 +6613,7 @@ PDC_FUNC_DECLARE_REGISTER(notify_region_update) PDC_FUNC_DECLARE_REGISTER(metadata_query) PDC_FUNC_DECLARE_REGISTER(container_query) PDC_FUNC_DECLARE_REGISTER(metadata_add_tag) +PDC_FUNC_DECLARE_REGISTER(send_rpc) PDC_FUNC_DECLARE_REGISTER_IN_OUT(metadata_del_kvtag, metadata_get_kvtag_in_t, metadata_add_tag_out_t) PDC_FUNC_DECLARE_REGISTER_IN_OUT(metadata_add_kvtag, metadata_add_kvtag_in_t, metadata_add_tag_out_t) PDC_FUNC_DECLARE_REGISTER(metadata_get_kvtag) diff --git a/src/server/pdc_server.c b/src/server/pdc_server.c index d73fd9b9f..7ba1d1090 100644 --- a/src/server/pdc_server.c +++ b/src/server/pdc_server.c @@ -1880,7 +1880,7 @@ PDC_Server_loop(hg_context_t *hg_context) /* Do not try to make progress anymore if we're done */ if (hg_atomic_cas32(&close_server_g, 1, 1)) break; - hg_ret = HG_Progress(hg_context, 1000); + hg_ret = HG_Progress(hg_context, 200); } while (hg_ret == HG_SUCCESS || hg_ret == HG_TIMEOUT); @@ -2009,6 +2009,7 @@ PDC_Server_mercury_register() PDC_metadata_add_kvtag_register(hg_class_g); PDC_metadata_get_kvtag_register(hg_class_g); PDC_metadata_del_kvtag_register(hg_class_g); + PDC_send_rpc_register(hg_class_g); // bulk PDC_query_partial_register(hg_class_g); @@ -2384,4 +2385,4 @@ server_run(int argc, char *argv[]) MPI_Finalize(); #endif return 0; -} \ No newline at end of file +} diff --git a/src/server/pdc_server_region/pdc_server_region_cache.c b/src/server/pdc_server_region/pdc_server_region_cache.c index 40564e784..98cd5758c 100644 --- a/src/server/pdc_server_region/pdc_server_region_cache.c +++ b/src/server/pdc_server_region/pdc_server_region_cache.c @@ -1,6 +1,9 @@ +#include "pdc_client_server_common.h" #include "pdc_server_region_cache.h" #include "pdc_timing.h" +/* #define TANG_DEBUG 1 */ + #ifdef PDC_SERVER_CACHE #ifdef PDC_SERVER_CACHE_MAX_SIZE @@ -15,6 +18,12 @@ #define PDC_CACHE_FLUSH_TIME_INT 30 #endif +#ifdef PDC_SERVER_IDLE_CACHE_FLUSH_TIME +#define PDC_IDLE_CACHE_FLUSH_TIME_INT PDC_SERVER_IDLE_CACHE_FLUSH_TIME +#else +#define PDC_IDLE_CACHE_FLUSH_TIME_INT 2 +#endif + typedef struct pdc_region_cache { struct pdc_region_info * region_cache_info; struct pdc_region_cache *next; @@ -38,6 +47,7 @@ static pthread_mutex_t pdc_cache_mutex; static int pdc_recycle_close_flag; static size_t total_cache_size; static size_t maximum_cache_size; +static int pdc_idle_flush_time_g; int PDC_region_server_cache_init() @@ -47,19 +57,27 @@ PDC_region_server_cache_init() pdc_recycle_close_flag = 0; pthread_mutex_init(&pdc_obj_cache_list_mutex, NULL); pthread_mutex_init(&pdc_cache_mutex, NULL); - pthread_create(&pdc_recycle_thread, NULL, &PDC_region_cache_clock_cycle, NULL); total_cache_size = 0; p = getenv("PDC_SERVER_CACHE_MAX_SIZE"); - if (p != NULL) { + if (p != NULL) maximum_cache_size = atol(p); - } - else { + else maximum_cache_size = MAX_CACHE_SIZE; - } + + p = getenv("PDC_SERVER_IDLE_CACHE_FLUSH_TIME"); + if (p != NULL) + pdc_idle_flush_time_g = atol(p); + else + pdc_idle_flush_time_g = PDC_IDLE_CACHE_FLUSH_TIME_INT; obj_cache_list = NULL; obj_cache_list_end = NULL; + + pthread_create(&pdc_recycle_thread, NULL, &PDC_region_cache_clock_cycle, NULL); + + gettimeofday(&last_cache_activity_timeval_g, NULL); + return 0; } @@ -401,10 +419,16 @@ PDC_region_cache_register(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dim pdc_obj_cache * obj_cache_iter, *obj_cache = NULL; struct pdc_region_info *region_cache_info; if (obj_ndim != ndim && obj_ndim > 0) { - printf("PDC_region_cache_register reports obj_ndim != ndim, %d != %d\n", obj_ndim, ndim); + printf("%s reports obj_ndim != ndim, %d != %d\n", __func__, obj_ndim, ndim); return FAIL; } +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: %s for %llu\n", cur_time, PDC_get_rank(), __func__, obj_id); +#endif + pthread_mutex_lock(&pdc_obj_cache_list_mutex); obj_cache_iter = obj_cache_list; @@ -472,14 +496,12 @@ PDC_region_cache_register(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dim memcpy(region_cache_info->buf, buf, sizeof(char) * buf_size); total_cache_size += buf_size; + gettimeofday(&last_cache_activity_timeval_g, NULL); + pthread_mutex_unlock(&pdc_obj_cache_list_mutex); if (total_cache_size > maximum_cache_size) { - int server_rank = 0; -#ifdef ENABLE_MPI - MPI_Comm_rank(MPI_COMM_WORLD, &server_rank); -#endif - printf("==PDC_SERVER[%d]: server cache full %.1f / %.1f MB, will flush to storage\n", server_rank, + printf("==PDC_SERVER[%d]: server cache full %.1f / %.1f MB, will flush to storage\n", PDC_get_rank(), total_cache_size / 1048576.0, maximum_cache_size / 1048576.0); PDC_region_cache_flush_all(); } @@ -522,19 +544,14 @@ PDC_transfer_request_data_write_out(uint64_t obj_id, int obj_ndim, const uint64_ pdc_obj_cache * obj_cache, *obj_cache_iter; pdc_region_cache *region_cache_iter; uint64_t * overlap_offset, *overlap_size; - // char * buf_merged; - // uint64_t * offset_merged, size_merged; - // int merge_status; - - perr_t ret_value = SUCCEED; + perr_t ret_value = SUCCEED; + char cur_time[64]; FUNC_ENTER(NULL); #ifdef PDC_TIMING double start = MPI_Wtime(); #endif - // Write 1GB at a time - uint64_t write_size = 0; if (region_info->ndim >= 1) write_size = unit * region_info->size[0]; @@ -543,6 +560,11 @@ PDC_transfer_request_data_write_out(uint64_t obj_id, int obj_ndim, const uint64_ if (region_info->ndim >= 3) write_size *= region_info->size[2]; +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: enter %s for %llu\n", cur_time, PDC_get_rank(), __func__, obj_id); +#endif + pthread_mutex_lock(&pdc_obj_cache_list_mutex); obj_cache = NULL; @@ -592,16 +614,30 @@ PDC_transfer_request_data_write_out(uint64_t obj_id, int obj_ndim, const uint64_ } } pthread_mutex_unlock(&pdc_obj_cache_list_mutex); + + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d]: %s after unlock\n", cur_time, PDC_get_rank(), __func__); */ + if (!flag) { PDC_region_cache_register(obj_id, obj_ndim, obj_dims, buf, write_size, region_info->offset, region_info->size, region_info->ndim, unit); } + /* else { */ + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d]: %s write is fully contained with cached region\n", cur_time, + * PDC_get_rank(), __func__); */ + /* } */ // PDC_Server_data_write_out2(obj_id, region_info, buf, unit); #ifdef PDC_TIMING pdc_server_timings->PDCcache_write += MPI_Wtime() - start; #endif +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: leaving %s\n", cur_time, PDC_get_rank(), __func__); +#endif + // done: fflush(stdout); FUNC_LEAVE(ret_value); @@ -679,7 +715,7 @@ sort_by_offset(const void *elem1, const void *elem2) } int -PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache) +PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache, int flag) { int i, nflush = 0; pdc_region_cache * region_cache_iter, *region_cache_temp; @@ -688,15 +724,18 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache) char ** buf, **new_buf, *buf_ptr = NULL; uint64_t * start, *end, *new_start, *new_end; int merged_request_size = 0; - int server_rank = 0; uint64_t unit; struct pdc_region_info **obj_regions; + char cur_time[64]; #ifdef PDC_TIMING double start_time = MPI_Wtime(); #endif + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d.%d]: enter %s\n", cur_time, PDC_get_rank(), flag, __func__); */ + + // For 1D case, we can merge regions to minimize the number of POSIX calls. if (obj_cache->ndim == 1 && obj_cache->region_cache_size) { - // For 1D case, we can merge regions to minimize the number of POSIX calls. start = (uint64_t *)malloc(sizeof(uint64_t) * obj_cache->region_cache_size * 2); end = start + obj_cache->region_cache_size; buf = (char **)malloc(sizeof(char *) * obj_cache->region_cache_size); @@ -759,14 +798,15 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache) free(region_cache_temp); } nflush += merged_request_size; - } + } // End for 1D -#ifdef ENABLE_MPI - MPI_Comm_rank(MPI_COMM_WORLD, &server_rank); -#endif // Iterate through all cache regions and use POSIX I/O to write them back to file system. region_cache_iter = obj_cache->region_cache; while (region_cache_iter != NULL) { + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d.%d]: %s going to write out\n", */ + /* cur_time, PDC_get_rank(), flag, __func__); */ + region_cache_info = region_cache_iter->region_cache_info; PDC_Server_transfer_request_io(obj_id, obj_cache->ndim, obj_cache->dims, region_cache_info, region_cache_info->buf, region_cache_info->unit, 1); @@ -777,8 +817,11 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache) if (obj_cache->ndim >= 3) write_size *= region_cache_info->size[2]; - printf("==PDC_SERVER[%d]: server flushed %.1f / %.1f MB to storage\n", server_rank, - write_size / 1048576.0, total_cache_size / 1048576.0); + if (write_size > 0) { + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d.%d]: %s server flushed %.1f / %.1f MB to storage\n", cur_time, + PDC_get_rank(), flag, __func__, write_size / 1048576.0, total_cache_size / 1048576.0); + } total_cache_size -= write_size; free(region_cache_info->offset); @@ -800,6 +843,10 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache) #ifdef PDC_TIMING pdc_server_timings->PDCcache_flush += MPI_Wtime() - start_time; #endif + + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d.%d]: leave %s\n", cur_time, PDC_get_rank(), flag, __func__); */ + return nflush; } @@ -820,7 +867,14 @@ PDC_region_cache_flush(uint64_t obj_id) // printf("server error: flushing object that does not exist\n"); return 1; } - PDC_region_cache_flush_by_pointer(obj_id, obj_cache); + +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: %s going to flush\n", cur_time, PDC_get_rank(), __func__); +#endif + + PDC_region_cache_flush_by_pointer(obj_id, obj_cache, 0); return 0; } @@ -832,7 +886,14 @@ PDC_region_cache_flush_all() obj_cache_iter = obj_cache_list; while (obj_cache_iter != NULL) { - PDC_region_cache_flush_by_pointer(obj_cache_iter->obj_id, obj_cache_iter); + +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: %s going to flush\n", cur_time, PDC_get_rank(), __func__); +#endif + + PDC_region_cache_flush_by_pointer(obj_cache_iter->obj_id, obj_cache_iter, 0); obj_cache_temp = obj_cache_iter; obj_cache_iter = obj_cache_iter->next; if (obj_cache_temp->ndim) { @@ -854,7 +915,9 @@ PDC_region_cache_clock_cycle(void *ptr) struct timeval finish_time; int nflush = 0; double flush_frequency_s = PDC_CACHE_FLUSH_TIME_INT, elapsed_time; - int server_rank = 0; + time_t t; + struct tm * tm; + char cur_time[64]; char *p = getenv("PDC_SERVER_CACHE_FLUSH_FREQUENCY_S"); if (p != NULL) @@ -863,44 +926,64 @@ PDC_region_cache_clock_cycle(void *ptr) if (ptr == NULL) { obj_cache_iter = NULL; } + while (1) { - nflush = 0; + // pdc_cache_mutex only protect pdc_recycle_close_flag pthread_mutex_lock(&pdc_cache_mutex); if (!pdc_recycle_close_flag) { - pthread_mutex_lock(&pdc_obj_cache_list_mutex); - gettimeofday(¤t_time, NULL); + /* pthread_mutex_lock(&pdc_obj_cache_list_mutex); */ obj_cache_iter = obj_cache_list; - nflush = 0; while (obj_cache_iter != NULL) { + gettimeofday(¤t_time, NULL); + /* pthread_mutex_lock(&pdc_obj_cache_list_mutex); */ obj_cache = obj_cache_iter; - // flush every *flush_frequency_s seconds - elapsed_time = current_time.tv_sec - obj_cache->timestamp.tv_sec + - (current_time.tv_usec - obj_cache->timestamp.tv_usec) / 1000000.0; - /* if (current_time.tv_sec - obj_cache->timestamp.tv_sec > flush_frequency_s) { */ - if (elapsed_time >= flush_frequency_s) { - nflush += PDC_region_cache_flush_by_pointer(obj_cache->obj_id, obj_cache); + + elapsed_time = current_time.tv_sec - last_cache_activity_timeval_g.tv_sec + + (current_time.tv_usec - last_cache_activity_timeval_g.tv_usec) / 1000000.0; + + if (elapsed_time >= pdc_idle_flush_time_g) { + + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d.1]: %s going to flush with idle time\n", */ + /* cur_time, PDC_get_rank(), __func__); */ + + pthread_mutex_lock(&pdc_obj_cache_list_mutex); + nflush = PDC_region_cache_flush_by_pointer(obj_cache->obj_id, obj_cache, 1); + pthread_mutex_unlock(&pdc_obj_cache_list_mutex); + + gettimeofday(&finish_time, NULL); + elapsed_time = finish_time.tv_sec - current_time.tv_sec + + (finish_time.tv_usec - current_time.tv_usec) / 1000000.0; + + if (nflush > 0) { + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d.1]: flushed %d regions to storage, took %.4fs\n", cur_time, + PDC_get_rank(), nflush, elapsed_time); + } + } + else { + obj_cache_iter = obj_cache_iter->next; + /* pthread_mutex_unlock(&pdc_obj_cache_list_mutex); */ + + /* PDC_get_time_str(cur_time); */ + /* fprintf(stderr, "%s ==PDC_SERVER[%d]: stop flush to allow processing new RPCs\n", */ + /* cur_time,PDC_get_rank()); */ + + usleep(300000); + break; } obj_cache_iter = obj_cache_iter->next; - } - if (nflush > 0) { -#ifdef ENABLE_MPI - MPI_Comm_rank(MPI_COMM_WORLD, &server_rank); -#endif - gettimeofday(&finish_time, NULL); - elapsed_time = finish_time.tv_sec - current_time.tv_sec + - (finish_time.tv_usec - current_time.tv_usec) / 1000000.0; - fprintf(stderr, - "==PDC_SERVER[%d]: flushed %d regions to storage (full/every %.0fs), took %.4fs\n", - server_rank, nflush, flush_frequency_s, elapsed_time); - } - pthread_mutex_unlock(&pdc_obj_cache_list_mutex); - } + /* pthread_mutex_unlock(&pdc_obj_cache_list_mutex); */ + usleep(300000); + } // End while obj_cache_iter + /* pthread_mutex_unlock(&pdc_obj_cache_list_mutex); */ + } // End if pdc_recycle_close_flag else { pthread_mutex_unlock(&pdc_cache_mutex); break; } pthread_mutex_unlock(&pdc_cache_mutex); - usleep(500); + usleep(10000); } return 0; } @@ -977,10 +1060,16 @@ PDC_region_fetch(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dims, struct } if (!flag) { if (obj_cache != NULL) { - PDC_region_cache_flush_by_pointer(obj_id, obj_cache); +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: %s going to flush\n", cur_time, PDC_get_rank(), __func__); +#endif + + PDC_region_cache_flush_by_pointer(obj_id, obj_cache, 0); } PDC_Server_transfer_request_io(obj_id, obj_ndim, obj_dims, region_info, buf, unit, 0); } return 0; } -#endif \ No newline at end of file +#endif diff --git a/src/server/pdc_server_region/pdc_server_region_request_handler.h b/src/server/pdc_server_region/pdc_server_region_request_handler.h index 04ab48481..c6f96f6ab 100644 --- a/src/server/pdc_server_region/pdc_server_region_request_handler.h +++ b/src/server/pdc_server_region/pdc_server_region_request_handler.h @@ -1,3 +1,5 @@ +/* #define TANG_DEBUG 1 */ + hg_return_t transfer_request_all_bulk_transfer_read_cb2(const struct hg_cb_info *info) { @@ -171,9 +173,17 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) hg_return_t ret = HG_SUCCESS; struct pdc_region_info * remote_reg_info; int i; + char cur_time[64]; FUNC_ENTER(NULL); +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: enter %s\n", cur_time, PDC_get_rank(), __func__); +#endif + + gettimeofday(&last_cache_activity_timeval_g, NULL); + #ifdef PDC_TIMING double end = MPI_Wtime(), start; pdc_server_timings->PDCreg_transfer_request_start_all_write_bulk_rpc += end - local_bulk_args->start_time; @@ -182,11 +192,11 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) start = MPI_Wtime(); #endif - // printf("entering transfer_request_all_bulk_transfer_write_cb\n"); remote_reg_info = (struct pdc_region_info *)malloc(sizeof(struct pdc_region_info)); request_data.n_objs = local_bulk_args->in.n_objs; parse_bulk_data(local_bulk_args->data_buf, &request_data, PDC_WRITE); // print_bulk_data(&request_data); + #ifndef PDC_SERVER_CACHE data_server_region_t **temp_ptrs = (data_server_region_t **)malloc(sizeof(data_server_region_t *) * request_data.n_objs); @@ -195,6 +205,12 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) PDC_Server_register_obj_region_by_pointer(temp_ptrs + i, request_data.obj_id[i], 1); } #endif + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: %s before (cache) writing\n", cur_time, PDC_get_rank(), __func__); +#endif + for (i = 0; i < request_data.n_objs; ++i) { remote_reg_info->ndim = request_data.remote_ndim[i]; remote_reg_info->offset = request_data.remote_offset[i]; @@ -208,6 +224,7 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) request_data.obj_dims[i], remote_reg_info, (void *)request_data.data_buf[i], request_data.unit[i], 1); #endif + #if 0 uint64_t j; fprintf(stderr, "server write array, offset = %lu, size = %lu:", request_data.remote_offset[i][0], request_data.remote_length[i][0]); @@ -220,6 +237,12 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) PDC_finish_request(local_bulk_args->transfer_request_id[i]); pthread_mutex_unlock(&transfer_request_status_mutex); } + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: %s after (cache) writing\n", cur_time, PDC_get_rank(), __func__); +#endif + #ifndef PDC_SERVER_CACHE for (i = 0; i < request_data.n_objs; ++i) { PDC_Server_unregister_obj_region_by_pointer(temp_ptrs[i], 1); @@ -245,6 +268,11 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) pdc_timestamp_register(pdc_transfer_request_inner_write_all_bulk_timestamps, start, end); #endif +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: leaving %s\n", cur_time, PDC_get_rank(), __func__); +#endif + FUNC_LEAVE(ret); } @@ -325,6 +353,14 @@ transfer_request_bulk_transfer_write_cb(const struct hg_cb_info *info) FUNC_ENTER(NULL); +#ifdef TANG_DEBUG + char cur_time[64]; + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: enter %s\n", cur_time, PDC_get_rank(), __func__); +#endif + + gettimeofday(&last_cache_activity_timeval_g, NULL); + #ifdef PDC_TIMING double end = MPI_Wtime(), start; pdc_server_timings->PDCreg_transfer_request_start_write_bulk_rpc += end - local_bulk_args->start_time; @@ -333,8 +369,6 @@ transfer_request_bulk_transfer_write_cb(const struct hg_cb_info *info) start = MPI_Wtime(); #endif - // printf("entering transfer bulk callback\n"); - remote_reg_info = (struct pdc_region_info *)malloc(sizeof(struct pdc_region_info)); remote_reg_info->ndim = (local_bulk_args->in.remote_region).ndim; @@ -399,6 +433,8 @@ transfer_request_bulk_transfer_read_cb(const struct hg_cb_info *info) start = MPI_Wtime(); #endif + gettimeofday(&last_cache_activity_timeval_g, NULL); + pthread_mutex_lock(&transfer_request_status_mutex); PDC_finish_request(local_bulk_args->transfer_request_id); pthread_mutex_unlock(&transfer_request_status_mutex); @@ -549,6 +585,7 @@ HG_TEST_RPC_CB(transfer_request_all, handle) transfer_request_all_out_t out; hg_return_t ret_value = HG_SUCCESS; int i; + char cur_time[64]; FUNC_ENTER(NULL); @@ -558,6 +595,14 @@ HG_TEST_RPC_CB(transfer_request_all, handle) HG_Get_input(handle, &in); +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: enter %s process CLIENT[%d]\n", cur_time, PDC_get_rank(), __func__, + in.client_id); +#endif + + gettimeofday(&last_cache_activity_timeval_g, NULL); + info = HG_Get_info(handle); local_bulk_args = (struct transfer_request_all_local_bulk_args *)malloc( sizeof(struct transfer_request_all_local_bulk_args)); @@ -568,11 +613,12 @@ HG_TEST_RPC_CB(transfer_request_all, handle) local_bulk_args->in = in; local_bulk_args->transfer_request_id = (uint64_t *)malloc(sizeof(uint64_t) * in.n_objs); - pthread_mutex_lock(&transfer_request_id_mutex); + // [Tang]TODO is this necessary? + /* pthread_mutex_lock(&transfer_request_id_mutex); */ for (i = 0; i < in.n_objs; ++i) { local_bulk_args->transfer_request_id[i] = PDC_transfer_request_id_register(); } - pthread_mutex_unlock(&transfer_request_id_mutex); + /* pthread_mutex_unlock(&transfer_request_id_mutex); */ pthread_mutex_lock(&transfer_request_status_mutex); // Metadata ID is in ascending order. We only need to return the first value, the client knows the size. @@ -591,10 +637,21 @@ HG_TEST_RPC_CB(transfer_request_all, handle) ret_value = HG_Bulk_create(info->hg_class, 1, &(local_bulk_args->data_buf), &(local_bulk_args->in.total_buf_size), HG_BULK_READWRITE, &(local_bulk_args->bulk_handle)); + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[x]: %s start bulk \n", cur_time, __func__); +#endif + ret_value = HG_Bulk_transfer(info->context, transfer_request_all_bulk_transfer_write_cb, local_bulk_args, HG_BULK_PULL, info->addr, in.local_bulk_handle, 0, local_bulk_args->bulk_handle, 0, local_bulk_args->in.total_buf_size, HG_OP_ID_IGNORE); + +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[x]: %s done bulk\n", cur_time, __func__); +#endif } else { // Read operation has to receive region metadata first. There will be another bulk transfer triggered @@ -623,6 +680,12 @@ HG_TEST_RPC_CB(transfer_request_all, handle) } #endif +#ifdef TANG_DEBUG + PDC_get_time_str(cur_time); + printf("%s ==PDC_SERVER[%d]: leaving %s responded CLIENT[%d]\n", cur_time, PDC_get_rank(), __func__, + in.client_id); +#endif + fflush(stdout); FUNC_LEAVE(ret_value); } @@ -771,6 +834,8 @@ HG_TEST_RPC_CB(transfer_request, handle) double start = MPI_Wtime(), end; #endif + gettimeofday(&last_cache_activity_timeval_g, NULL); + HG_Get_input(handle, &in); info = HG_Get_info(handle); @@ -785,9 +850,9 @@ HG_TEST_RPC_CB(transfer_request, handle) if (in.remote_region.ndim >= 3) { total_mem_size *= in.remote_region.count_2; } - pthread_mutex_lock(&transfer_request_id_mutex); + /* pthread_mutex_lock(&transfer_request_id_mutex); */ out.metadata_id = PDC_transfer_request_id_register(); - pthread_mutex_unlock(&transfer_request_id_mutex); + /* pthread_mutex_unlock(&transfer_request_id_mutex); */ pthread_mutex_lock(&transfer_request_status_mutex); PDC_commit_request(out.metadata_id); pthread_mutex_unlock(&transfer_request_status_mutex); diff --git a/src/server/pdc_server_region/pdc_server_region_transfer.c b/src/server/pdc_server_region/pdc_server_region_transfer.c index 8f7ab8886..0fb4d5a67 100644 --- a/src/server/pdc_server_region/pdc_server_region_transfer.c +++ b/src/server/pdc_server_region/pdc_server_region_transfer.c @@ -1,19 +1,8 @@ #include "pdc_client_server_common.h" #include "pdc_server_data.h" +#include "pdc_timing.h" static int io_by_region_g = 1; -int -get_server_rank() -{ -#ifdef ENABLE_MPI - int result; - MPI_Comm_rank(MPI_COMM_WORLD, &result); - return result; -#else - return 0; -#endif -} - int try_reset_dims() { @@ -92,6 +81,7 @@ PDC_finish_request(uint64_t transfer_request_id) perr_t ret_value = SUCCEED; transfer_request_wait_out_t out; transfer_request_wait_all_out_t out_all; + char cur_time[64]; FUNC_ENTER(NULL); @@ -108,10 +98,20 @@ PDC_finish_request(uint64_t transfer_request_id) printf("PDC SERVER PDC_finish_request out type unset error %d\n", __LINE__); } if (ptr->out_type) { + + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d]: enter %s, out_all ret\n", cur_time, PDC_get_rank(), + * __func__); */ + out_all.ret = 1; ret_value = HG_Respond(ptr->handle, NULL, NULL, &out_all); } else { + + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d]: enter %s, out ret\n", cur_time, PDC_get_rank(), + * __func__); */ + out.ret = 1; ret_value = HG_Respond(ptr->handle, NULL, NULL, &out); } @@ -277,11 +277,13 @@ PDC_Server_transfer_request_io(uint64_t obj_id, int obj_ndim, const uint64_t *ob char storage_location[ADDR_MAX]; ssize_t io_size; uint64_t i, j; - - int server_rank = get_server_rank(); + char cur_time[64]; FUNC_ENTER(NULL); + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d]: enter %s\n", cur_time, PDC_get_rank(), __func__); */ + if (io_by_region_g || obj_ndim == 0) { // PDC_Server_register_obj_region(obj_id); if (is_write) { @@ -309,7 +311,7 @@ PDC_Server_transfer_request_io(uint64_t obj_id, int obj_ndim, const uint64_t *ob } // Data path prefix will be $SCRATCH/pdc_data/$obj_id/ snprintf(storage_location, ADDR_MAX, "%.200s/pdc_data/%" PRIu64 "/server%d/s%04d.bin", data_path, obj_id, - server_rank, server_rank); + PDC_get_rank(), PDC_get_rank()); PDC_mkdir(storage_location); fd = open(storage_location, O_RDWR | O_CREAT, 0666); @@ -394,6 +396,9 @@ PDC_Server_transfer_request_io(uint64_t obj_id, int obj_ndim, const uint64_t *ob close(fd); done: + /* PDC_get_time_str(cur_time); */ + /* printf("%s ==PDC_SERVER[%d]: leave %s\n", cur_time, PDC_get_rank(), __func__); */ + fflush(stdout); FUNC_LEAVE(ret_value); } diff --git a/src/tests/kvtag_query_scale_col.c b/src/tests/kvtag_query_scale_col.c index a2a4b8405..91c6d7481 100644 --- a/src/tests/kvtag_query_scale_col.c +++ b/src/tests/kvtag_query_scale_col.c @@ -31,6 +31,7 @@ #include "pdc.h" #include "pdc_client_connect.h" #include "string_utils.h" +#include "mpi.h" int assign_work_to_rank(int rank, int size, int nwork, int *my_count, int *my_start) diff --git a/src/tests/region_transfer_all.c b/src/tests/region_transfer_all.c index 88f8c9bd4..2ee7efaef 100644 --- a/src/tests/region_transfer_all.c +++ b/src/tests/region_transfer_all.c @@ -138,10 +138,10 @@ main(int argc, char **argv) ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); break; } - case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); - break; - } + /* case 3: { */ + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ + /* break; */ + /* } */ default: { } } @@ -329,6 +329,8 @@ main(int argc, char **argv) } } + MPI_Barrier(MPI_COMM_WORLD); + // close object for (i = 0; i < OBJ_NUM; ++i) { if (PDCobj_close(obj[i]) < 0) { diff --git a/src/tests/region_transfer_all_2D.c b/src/tests/region_transfer_all_2D.c index 03fe83d6f..3ae2dbf67 100644 --- a/src/tests/region_transfer_all_2D.c +++ b/src/tests/region_transfer_all_2D.c @@ -137,10 +137,10 @@ main(int argc, char **argv) ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); break; } - case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); - break; - } + /* case 3: { */ + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ + /* break; */ + /* } */ default: { } } diff --git a/src/tests/region_transfer_all_3D.c b/src/tests/region_transfer_all_3D.c index bcfaadb4c..d745dbed4 100644 --- a/src/tests/region_transfer_all_3D.c +++ b/src/tests/region_transfer_all_3D.c @@ -140,10 +140,10 @@ main(int argc, char **argv) ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); break; } - case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); - break; - } + /* case 3: { */ + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ + /* break; */ + /* } */ default: { } } diff --git a/src/tests/region_transfer_all_append.c b/src/tests/region_transfer_all_append.c index 28ebfb2c6..19292e288 100644 --- a/src/tests/region_transfer_all_append.c +++ b/src/tests/region_transfer_all_append.c @@ -133,10 +133,10 @@ main(int argc, char **argv) ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); break; } - case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); - break; - } + /* case 3: { */ + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ + /* break; */ + /* } */ default: { } } @@ -260,6 +260,9 @@ main(int argc, char **argv) /* printf("successfully closed local region @ line %d\n", __LINE__); */ /* } */ + if (rank == 0) + printf("Test 0 done\n"); + for (i = 0; i < OBJ_NUM; ++i) { sprintf(obj_name, "o%d_%d", i, rank); obj[i] = PDCobj_open(obj_name, pdc); @@ -373,6 +376,9 @@ main(int argc, char **argv) } } + if (rank == 0) + printf("Test 1 done\n"); + for (i = 0; i < OBJ_NUM; ++i) { sprintf(obj_name, "o%d_%d", i, rank); obj[i] = PDCobj_open(obj_name, pdc); @@ -495,6 +501,9 @@ main(int argc, char **argv) /* printf("successfully local region @ line %d\n", __LINE__); */ /* } */ + if (rank == 0) + printf("Test 2 done\n"); + // Now we rewrite the whole object and check its values. // open object for (i = 0; i < OBJ_NUM; ++i) { @@ -612,6 +621,9 @@ main(int argc, char **argv) /* } */ } + if (rank == 0) + printf("Test 3 done\n"); + // open object for (i = 0; i < OBJ_NUM; ++i) { sprintf(obj_name, "o%d_%d", i, rank); @@ -732,6 +744,9 @@ main(int argc, char **argv) } } + if (rank == 0) + printf("Test 4 done\n"); + // close a container if (PDCcont_close(cont) < 0) { printf("fail to close container c1 @ line %d\n", __LINE__); diff --git a/src/tests/region_transfer_all_append_2D.c b/src/tests/region_transfer_all_append_2D.c index c747afcab..23d0c9f62 100644 --- a/src/tests/region_transfer_all_append_2D.c +++ b/src/tests/region_transfer_all_append_2D.c @@ -142,10 +142,10 @@ main(int argc, char **argv) ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); break; } - case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); - break; - } + /* case 3: { */ + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ + /* break; */ + /* } */ default: { } } diff --git a/src/tests/region_transfer_all_append_3D.c b/src/tests/region_transfer_all_append_3D.c index 254771379..298afd5c7 100644 --- a/src/tests/region_transfer_all_append_3D.c +++ b/src/tests/region_transfer_all_append_3D.c @@ -144,10 +144,10 @@ main(int argc, char **argv) ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); break; } - case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); - break; - } + /* case 3: { */ + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ + /* break; */ + /* } */ default: { } } diff --git a/src/tests/region_transfer_all_split_wait.c b/src/tests/region_transfer_all_split_wait.c index a69fa9ba6..61caaf3ce 100644 --- a/src/tests/region_transfer_all_split_wait.c +++ b/src/tests/region_transfer_all_split_wait.c @@ -129,7 +129,7 @@ main(int argc, char **argv) break; } case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ break; } default: { diff --git a/src/tests/region_transfer_set_dims.c b/src/tests/region_transfer_set_dims.c index 1efd1d3be..dc49dc1aa 100644 --- a/src/tests/region_transfer_set_dims.c +++ b/src/tests/region_transfer_set_dims.c @@ -140,7 +140,7 @@ main(int argc, char **argv) break; } case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ break; } default: { diff --git a/src/tests/region_transfer_set_dims_2D.c b/src/tests/region_transfer_set_dims_2D.c index 991658709..b8adbf48a 100644 --- a/src/tests/region_transfer_set_dims_2D.c +++ b/src/tests/region_transfer_set_dims_2D.c @@ -143,7 +143,7 @@ main(int argc, char **argv) break; } case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ break; } default: { diff --git a/src/tests/region_transfer_set_dims_3D.c b/src/tests/region_transfer_set_dims_3D.c index 1752c6315..6b9827593 100644 --- a/src/tests/region_transfer_set_dims_3D.c +++ b/src/tests/region_transfer_set_dims_3D.c @@ -145,7 +145,7 @@ main(int argc, char **argv) break; } case 3: { - ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); + /* ret = PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_DYNAMIC); */ break; } default: { diff --git a/src/tests/region_transfer_status.c b/src/tests/region_transfer_status.c index c513c56c2..a474a5d94 100644 --- a/src/tests/region_transfer_status.c +++ b/src/tests/region_transfer_status.c @@ -174,6 +174,7 @@ main(int argc, char **argv) ret_value = 1; } + PDCregion_transfer_wait(transfer_request); PDCregion_transfer_close(transfer_request); if (PDCregion_close(reg) < 0) { @@ -218,6 +219,7 @@ main(int argc, char **argv) ret_value = 1; } + PDCregion_transfer_wait(transfer_request); PDCregion_transfer_close(transfer_request); // Check if data written previously has been correctly read. diff --git a/src/tests/vpicio_mts.c b/src/tests/vpicio_mts.c index 4c4679459..102891ff0 100644 --- a/src/tests/vpicio_mts.c +++ b/src/tests/vpicio_mts.c @@ -32,6 +32,7 @@ #include #include #include "pdc.h" +#include "pdc_timing.h" #define NPARTICLES 8388608 @@ -51,35 +52,26 @@ int main(int argc, char **argv) { int rank = 0, size = 1; - pdcid_t pdc_id, cont_prop, cont_id; - pdcid_t obj_prop_xx, obj_prop_yy, obj_prop_zz, obj_prop_pxx, obj_prop_pyy, obj_prop_pzz, obj_prop_id11, - obj_prop_id22; + pdcid_t pdc_id, cont_prop, cont_id, region_local, region_remote; + pdcid_t obj_prop_float, obj_prop_int; pdcid_t obj_xx, obj_yy, obj_zz, obj_pxx, obj_pyy, obj_pzz, obj_id11, obj_id22; - pdcid_t region_x, region_y, region_z, region_px, region_py, region_pz, region_id1, region_id2; - pdcid_t region_xx, region_yy, region_zz, region_pxx, region_pyy, region_pzz, region_id11, region_id22; - perr_t ret; #ifdef ENABLE_MPI MPI_Comm comm; #else int comm = 1; #endif - float * x, *y, *z; - float * px, *py, *pz; - int * id1, *id2; - int x_dim = 64; - int y_dim = 64; - int z_dim = 64; - uint64_t numparticles, i; - uint64_t dims[1]; - int ndim = 1; - uint64_t *offset; - uint64_t *offset_remote; - uint64_t *mysize; - double t0, t1; - uint64_t steps = 1, sleeptime = 0; + float * x, *y, *z, *px, *py, *pz; + int * id1, *id2; + int x_dim = 64, y_dim = 64, z_dim = 64, ndim = 1, steps = 1, sleeptime = 0; + uint64_t numparticles, dims[1], offset_local[1], offset_remote[1], mysize[1]; + double t0, t1; + char cur_time[64]; + time_t t; + struct tm *tm; pdcid_t transfer_request_x, transfer_request_y, transfer_request_z, transfer_request_px, transfer_request_py, transfer_request_pz, transfer_request_id1, transfer_request_id2; + pdcid_t transfer_requests[8]; #ifdef ENABLE_MPI MPI_Init(&argc, &argv); @@ -95,15 +87,14 @@ main(int argc, char **argv) sleeptime = atoi(argv[3]); } if (rank == 0) - printf("Writing %" PRIu64 " number of particles for %llu steps with %d clients.\n", numparticles, - steps, size); + printf("Writing %" PRIu64 " number of particles for %d steps with %d clients.\n", numparticles, steps, + size); dims[0] = numparticles * size; - x = (float *)malloc(numparticles * sizeof(float)); - y = (float *)malloc(numparticles * sizeof(float)); - z = (float *)malloc(numparticles * sizeof(float)); - + x = (float *)malloc(numparticles * sizeof(float)); + y = (float *)malloc(numparticles * sizeof(float)); + z = (float *)malloc(numparticles * sizeof(float)); px = (float *)malloc(numparticles * sizeof(float)); py = (float *)malloc(numparticles * sizeof(float)); pz = (float *)malloc(numparticles * sizeof(float)); @@ -118,45 +109,28 @@ main(int argc, char **argv) cont_prop = PDCprop_create(PDC_CONT_CREATE, pdc_id); if (cont_prop <= 0) { printf("Fail to create container property @ line %d!\n", __LINE__); - return 1; + return FAIL; } // create a container cont_id = PDCcont_create_col("c1", cont_prop); if (cont_id <= 0) { printf("Fail to create container @ line %d!\n", __LINE__); - return 1; + return FAIL; } // create an object property - obj_prop_xx = PDCprop_create(PDC_OBJ_CREATE, pdc_id); - - PDCprop_set_obj_dims(obj_prop_xx, 1, dims); - PDCprop_set_obj_type(obj_prop_xx, PDC_FLOAT); - PDCprop_set_obj_user_id(obj_prop_xx, getuid()); - PDCprop_set_obj_app_name(obj_prop_xx, "VPICIO"); - PDCprop_set_obj_tags(obj_prop_xx, "tag0=1"); - - obj_prop_yy = PDCprop_obj_dup(obj_prop_xx); - PDCprop_set_obj_type(obj_prop_yy, PDC_FLOAT); - - obj_prop_zz = PDCprop_obj_dup(obj_prop_xx); - PDCprop_set_obj_type(obj_prop_zz, PDC_FLOAT); - - obj_prop_pxx = PDCprop_obj_dup(obj_prop_xx); - PDCprop_set_obj_type(obj_prop_pxx, PDC_FLOAT); - - obj_prop_pyy = PDCprop_obj_dup(obj_prop_xx); - PDCprop_set_obj_type(obj_prop_pyy, PDC_FLOAT); - - obj_prop_pzz = PDCprop_obj_dup(obj_prop_xx); - PDCprop_set_obj_type(obj_prop_pzz, PDC_FLOAT); - - obj_prop_id11 = PDCprop_obj_dup(obj_prop_xx); - PDCprop_set_obj_type(obj_prop_id11, PDC_INT); - - obj_prop_id22 = PDCprop_obj_dup(obj_prop_xx); - PDCprop_set_obj_type(obj_prop_id22, PDC_INT); - - for (i = 0; i < numparticles; i++) { + obj_prop_float = PDCprop_create(PDC_OBJ_CREATE, pdc_id); + PDCprop_set_obj_dims(obj_prop_float, 1, dims); + PDCprop_set_obj_type(obj_prop_float, PDC_FLOAT); + PDCprop_set_obj_user_id(obj_prop_float, getuid()); + PDCprop_set_obj_app_name(obj_prop_float, "VPICIO"); + PDCprop_set_obj_tags(obj_prop_float, "tag0=1"); + /* PDCprop_set_obj_transfer_region_type(obj_prop_float, PDC_REGION_LOCAL); */ + PDCprop_set_obj_transfer_region_type(obj_prop_float, PDC_REGION_STATIC); + + obj_prop_int = PDCprop_obj_dup(obj_prop_float); + PDCprop_set_obj_type(obj_prop_int, PDC_INT); + + for (uint64_t i = 0; i < numparticles; i++) { id1[i] = i; id2[i] = i * 2; x[i] = uniform_random_number() * x_dim; @@ -167,463 +141,262 @@ main(int argc, char **argv) pz[i] = ((float)id2[i] / numparticles) * z_dim; } - offset = (uint64_t *)malloc(sizeof(uint64_t) * ndim); - offset_remote = (uint64_t *)malloc(sizeof(uint64_t) * ndim); - mysize = (uint64_t *)malloc(sizeof(uint64_t) * ndim); - offset[0] = 0; + offset_local[0] = 0; offset_remote[0] = rank * numparticles; mysize[0] = numparticles; - // create a region - region_x = PDCregion_create(ndim, offset, mysize); - region_y = PDCregion_create(ndim, offset, mysize); - region_z = PDCregion_create(ndim, offset, mysize); - region_px = PDCregion_create(ndim, offset, mysize); - region_py = PDCregion_create(ndim, offset, mysize); - region_pz = PDCregion_create(ndim, offset, mysize); - region_id1 = PDCregion_create(ndim, offset, mysize); - region_id2 = PDCregion_create(ndim, offset, mysize); - - region_xx = PDCregion_create(ndim, offset_remote, mysize); - region_yy = PDCregion_create(ndim, offset_remote, mysize); - region_zz = PDCregion_create(ndim, offset_remote, mysize); - region_pxx = PDCregion_create(ndim, offset_remote, mysize); - region_pyy = PDCregion_create(ndim, offset_remote, mysize); - region_pzz = PDCregion_create(ndim, offset_remote, mysize); - region_id11 = PDCregion_create(ndim, offset_remote, mysize); - region_id22 = PDCregion_create(ndim, offset_remote, mysize); - - for (i = 0; i < steps; i++) { + // create local and remote region + region_local = PDCregion_create(ndim, offset_local, mysize); + region_remote = PDCregion_create(ndim, offset_remote, mysize); + + for (int iter = 0; iter < steps; iter++) { #ifdef ENABLE_MPI MPI_Barrier(MPI_COMM_WORLD); + PDC_get_time_str(cur_time); + if (rank == 0) + printf("\n[%s] #Step %d\n", cur_time, iter); t0 = MPI_Wtime(); - if (rank == 0) { - printf("\n#Step %llu\n", i); - } #endif - PDCprop_set_obj_time_step(obj_prop_xx, i); - PDCprop_set_obj_time_step(obj_prop_yy, i); - PDCprop_set_obj_time_step(obj_prop_zz, i); - PDCprop_set_obj_time_step(obj_prop_pxx, i); - PDCprop_set_obj_time_step(obj_prop_pyy, i); - PDCprop_set_obj_time_step(obj_prop_pzz, i); - PDCprop_set_obj_time_step(obj_prop_id11, i); - PDCprop_set_obj_time_step(obj_prop_id22, i); - - obj_xx = PDCobj_create_mpi(cont_id, "obj-var-xx", obj_prop_xx, 0, comm); + PDCprop_set_obj_time_step(obj_prop_float, iter); + PDCprop_set_obj_time_step(obj_prop_int, iter); + + obj_xx = PDCobj_create_mpi(cont_id, "obj-var-xx", obj_prop_float, 0, comm); if (obj_xx == 0) { - printf("Error getting an object id of %s from server, exit...\n", "obj-var-xx"); - exit(-1); + printf("Error getting an object id of %s from server\n", "x"); + return FAIL; } - obj_yy = PDCobj_create_mpi(cont_id, "obj-var-yy", obj_prop_yy, 0, comm); + obj_yy = PDCobj_create_mpi(cont_id, "obj-var-yy", obj_prop_float, 0, comm); if (obj_yy == 0) { - printf("Error getting an object id of %s from server, exit...\n", "obj-var-yy"); - exit(-1); + printf("Error getting an object id of %s from server\n", "y"); + return FAIL; } - obj_zz = PDCobj_create_mpi(cont_id, "obj-var-zz", obj_prop_zz, 0, comm); + obj_zz = PDCobj_create_mpi(cont_id, "obj-var-zz", obj_prop_float, 0, comm); if (obj_zz == 0) { - printf("Error getting an object id of %s from server, exit...\n", "obj-var-zz"); - exit(-1); + printf("Error getting an object id of %s from server\n", "z"); + return FAIL; } - obj_pxx = PDCobj_create_mpi(cont_id, "obj-var-pxx", obj_prop_pxx, 0, comm); + obj_pxx = PDCobj_create_mpi(cont_id, "obj-var-pxx", obj_prop_float, 0, comm); if (obj_pxx == 0) { - printf("Error getting an object id of %s from server, exit...\n", "obj-var-pxx"); - exit(-1); + printf("Error getting an object id of %s from server\n", "px"); + return FAIL; } - obj_pyy = PDCobj_create_mpi(cont_id, "obj-var-pyy", obj_prop_pyy, 0, comm); + obj_pyy = PDCobj_create_mpi(cont_id, "obj-var-pyy", obj_prop_float, 0, comm); if (obj_pyy == 0) { - printf("Error getting an object id of %s from server, exit...\n", "obj-var-pyy"); - exit(-1); + printf("Error getting an object id of %s from server\n", "py"); + return FAIL; } - obj_pzz = PDCobj_create_mpi(cont_id, "obj-var-pzz", obj_prop_pzz, 0, comm); + obj_pzz = PDCobj_create_mpi(cont_id, "obj-var-pzz", obj_prop_float, 0, comm); if (obj_pzz == 0) { - printf("Error getting an object id of %s from server, exit...\n", "obj-var-pzz"); - exit(-1); + printf("Error getting an object id of %s from server\n", "pz"); + return FAIL; } - obj_id11 = PDCobj_create_mpi(cont_id, "id11", obj_prop_id11, 0, comm); + obj_id11 = PDCobj_create_mpi(cont_id, "id11", obj_prop_int, 0, comm); if (obj_id11 == 0) { - printf("Error getting an object id of %s from server, exit...\n", "obj_id11"); - exit(-1); + printf("Error getting an object id of %s from server\n", "id1"); + return FAIL; } - obj_id22 = PDCobj_create_mpi(cont_id, "id22", obj_prop_id22, 0, comm); + obj_id22 = PDCobj_create_mpi(cont_id, "id22", obj_prop_int, 0, comm); if (obj_id22 == 0) { - printf("Error getting an object id of %s from server, exit...\n", "obj_id22"); - exit(-1); + printf("Error getting an object id of %s from server\n", "id2"); + return FAIL; } #ifdef ENABLE_MPI MPI_Barrier(MPI_COMM_WORLD); t1 = MPI_Wtime(); - if (rank == 0) { - printf("Obj create time: %.5e\n", t1 - t0); - } + PDC_get_time_str(cur_time); + if (rank == 0) + printf("[%s] Obj create time: %.5e\n", cur_time, t1 - t0); #endif - transfer_request_x = PDCregion_transfer_create(&x[0], PDC_WRITE, obj_xx, region_x, region_xx); - if (transfer_request_x == 0) { - printf("Array x transfer request creation failed\n"); - return 1; - } - transfer_request_y = PDCregion_transfer_create(&y[0], PDC_WRITE, obj_yy, region_y, region_yy); - if (transfer_request_y == 0) { - printf("Array y transfer request creation failed\n"); - return 1; - } - transfer_request_z = PDCregion_transfer_create(&z[0], PDC_WRITE, obj_zz, region_z, region_zz); - if (transfer_request_z == 0) { - printf("Array z transfer request creation failed\n"); - return 1; - } - transfer_request_px = PDCregion_transfer_create(&px[0], PDC_WRITE, obj_pxx, region_px, region_pxx); - if (transfer_request_px == 0) { - printf("Array px transfer request creation failed\n"); - return 1; - } - transfer_request_py = PDCregion_transfer_create(&py[0], PDC_WRITE, obj_pyy, region_py, region_pyy); - if (transfer_request_py == 0) { - printf("Array py transfer request creation failed\n"); - return 1; - } - transfer_request_pz = PDCregion_transfer_create(&pz[0], PDC_WRITE, obj_pzz, region_pz, region_pzz); - if (transfer_request_pz == 0) { - printf("Array pz transfer request creation failed\n"); - return 1; - } - transfer_request_id1 = - PDCregion_transfer_create(&id1[0], PDC_WRITE, obj_id11, region_id1, region_id11); - if (transfer_request_id1 == 0) { - printf("Array id1 transfer request creation failed\n"); - return 1; - } - transfer_request_id2 = - PDCregion_transfer_create(&id2[0], PDC_WRITE, obj_id22, region_id2, region_id22); - if (transfer_request_id2 == 0) { - printf("Array id2 transfer request creation failed\n"); - return 1; + transfer_requests[0] = + PDCregion_transfer_create(&x[0], PDC_WRITE, obj_xx, region_local, region_remote); + if (transfer_requests[0] == 0) { + printf("x transfer request creation failed\n"); + return FAIL; + } + transfer_requests[1] = + PDCregion_transfer_create(&y[0], PDC_WRITE, obj_yy, region_local, region_remote); + if (transfer_requests[1] == 0) { + printf("y transfer request creation failed\n"); + return FAIL; + } + transfer_requests[2] = + PDCregion_transfer_create(&z[0], PDC_WRITE, obj_zz, region_local, region_remote); + if (transfer_requests[2] == 0) { + printf("z transfer request creation failed\n"); + return FAIL; + } + transfer_requests[3] = + PDCregion_transfer_create(&px[0], PDC_WRITE, obj_pxx, region_local, region_remote); + if (transfer_requests[3] == 0) { + printf("px transfer request creation failed\n"); + return FAIL; + } + transfer_requests[4] = + PDCregion_transfer_create(&py[0], PDC_WRITE, obj_pyy, region_local, region_remote); + if (transfer_requests[4] == 0) { + printf("py transfer request creation failed\n"); + return FAIL; + } + transfer_requests[5] = + PDCregion_transfer_create(&pz[0], PDC_WRITE, obj_pzz, region_local, region_remote); + if (transfer_requests[5] == 0) { + printf("pz transfer request creation failed\n"); + return FAIL; + } + transfer_requests[6] = + PDCregion_transfer_create(&id1[0], PDC_WRITE, obj_id11, region_local, region_remote); + if (transfer_requests[6] == 0) { + printf("id1 transfer request creation failed\n"); + return FAIL; + } + transfer_requests[7] = + PDCregion_transfer_create(&id2[0], PDC_WRITE, obj_id22, region_local, region_remote); + if (transfer_requests[7] == 0) { + printf("id2 transfer request creation failed\n"); + return FAIL; } #ifdef ENABLE_MPI MPI_Barrier(MPI_COMM_WORLD); t0 = MPI_Wtime(); - if (rank == 0) { - printf("Transfer create time: %.5e\n", t0 - t1); - } + PDC_get_time_str(cur_time); + if (rank == 0) + printf("[%s] Transfer create time: %.5e\n", cur_time, t0 - t1); #endif - ret = PDCregion_transfer_start(transfer_request_x); - if (ret != SUCCEED) { - printf("Failed to start transfer for region_xx\n"); - return 1; - } - ret = PDCregion_transfer_start(transfer_request_y); - if (ret != SUCCEED) { - printf("Failed to start transfer for region_yy\n"); - return 1; - } - ret = PDCregion_transfer_start(transfer_request_z); - if (ret != SUCCEED) { - printf("Failed to start transfer for region_zz\n"); - return 1; - } - ret = PDCregion_transfer_start(transfer_request_px); - if (ret != SUCCEED) { - printf("Failed to start transfer for region_pxx\n"); - return 1; - } - ret = PDCregion_transfer_start(transfer_request_py); - if (ret != SUCCEED) { - printf("Failed to start transfer for region_pyy\n"); - return 1; - } - ret = PDCregion_transfer_start(transfer_request_pz); - if (ret != SUCCEED) { - printf("Failed to start transfer for region_pzz\n"); - return 1; - } - ret = PDCregion_transfer_start(transfer_request_id1); - if (ret != SUCCEED) { - printf("Failed to start transfer for region_id11\n"); - return 1; - } - ret = PDCregion_transfer_start(transfer_request_id2); - if (ret != SUCCEED) { - printf("Failed to start transfer for region_id22\n"); - return 1; + if (PDCregion_transfer_start_all(transfer_requests, 8) != SUCCEED) { + printf("Failed to start transfer requests\n"); + return FAIL; } #ifdef ENABLE_MPI MPI_Barrier(MPI_COMM_WORLD); t1 = MPI_Wtime(); - if (rank == 0) { - printf("Transfer start time: %.5e\n", t1 - t0); - } + PDC_get_time_str(cur_time); + if (rank == 0) + printf("[%s] Transfer start time: %.5e\n", cur_time, t1 - t0); #endif - - ret = PDCregion_transfer_wait(transfer_request_x); - if (ret != SUCCEED) { - printf("Failed to transfer wait for region_xx\n"); - return 1; - } - ret = PDCregion_transfer_wait(transfer_request_y); - if (ret != SUCCEED) { - printf("Failed to transfer wait for region_yy\n"); - return 1; - } - ret = PDCregion_transfer_wait(transfer_request_z); - if (ret != SUCCEED) { - printf("Failed to transfer wait for region_zz\n"); - return 1; - } - ret = PDCregion_transfer_wait(transfer_request_px); - if (ret != SUCCEED) { - printf("Failed to transfer wait for region_pxx\n"); - return 1; - } - ret = PDCregion_transfer_wait(transfer_request_py); - if (ret != SUCCEED) { - printf("Failed to transfer wait for region_pyy\n"); - return 1; - } - ret = PDCregion_transfer_wait(transfer_request_pz); - if (ret != SUCCEED) { - printf("Failed to transfer wait for region_pzz\n"); - return 1; - } - ret = PDCregion_transfer_wait(transfer_request_id1); - if (ret != SUCCEED) { - printf("Failed to transfer wait for region_id11\n"); - return 1; - } - ret = PDCregion_transfer_wait(transfer_request_id2); - if (ret != SUCCEED) { - printf("Failed to transfer wait for region_id22\n"); - return 1; + // Emulate compute with sleep + if (iter != steps - 1) { + PDC_get_time_str(cur_time); + if (rank == 0) + printf("[%s] Sleep start: %llu.00\n", cur_time, sleeptime); + sleep(sleeptime); + PDC_get_time_str(cur_time); + if (rank == 0) + printf("[%s] Sleep end: %llu.00\n", cur_time, sleeptime); } #ifdef ENABLE_MPI MPI_Barrier(MPI_COMM_WORLD); t0 = MPI_Wtime(); - if (rank == 0) { - printf("Transfer wait time: %.5e\n", t0 - t1); - } #endif - ret = PDCregion_transfer_close(transfer_request_x); - if (ret != SUCCEED) { - printf("region xx transfer close failed\n"); - return 1; - } - ret = PDCregion_transfer_close(transfer_request_y); - if (ret != SUCCEED) { - printf("region yy transfer close failed\n"); - return 1; - } - ret = PDCregion_transfer_close(transfer_request_z); - if (ret != SUCCEED) { - printf("region zz transfer close failed\n"); - return 1; - } - ret = PDCregion_transfer_close(transfer_request_px); - if (ret != SUCCEED) { - printf("region pxx transfer close failed\n"); - return 1; - } - ret = PDCregion_transfer_close(transfer_request_py); - if (ret != SUCCEED) { - printf("region pyy transfer close failed\n"); - return 1; - } - ret = PDCregion_transfer_close(transfer_request_pz); - if (ret != SUCCEED) { - printf("region pzz transfer close failed\n"); - return 1; - } - ret = PDCregion_transfer_close(transfer_request_id1); - if (ret != SUCCEED) { - printf("region id11 transfer close failed\n"); - return 1; - } - ret = PDCregion_transfer_close(transfer_request_id2); - if (ret != SUCCEED) { - printf("region id22 transfer close failed\n"); - return 1; + if (PDCregion_transfer_wait_all(transfer_requests, 8) != SUCCEED) { + printf("Failed to transfer wait all\n"); + return FAIL; } #ifdef ENABLE_MPI MPI_Barrier(MPI_COMM_WORLD); t1 = MPI_Wtime(); - if (rank == 0) { - printf("Transfer close time: %.5e\n", t1 - t0); + PDC_get_time_str(cur_time); + if (rank == 0) + printf("[%s] Transfer wait time: %.5e\n", cur_time, t1 - t0); +#endif + + for (int j = 0; j < 8; j++) { + if (PDCregion_transfer_close(transfer_requests[j]) != SUCCEED) { + printf("region transfer close failed\n"); + return FAIL; + } } + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + PDC_get_time_str(cur_time); + if (rank == 0) + printf("[%s] Transfer close time: %.5e\n", cur_time, t0 - t1); #endif - if (PDCobj_close(obj_xx) < 0) { + if (PDCobj_close(obj_xx) != SUCCEED) { printf("fail to close obj_xx\n"); - return 1; + return FAIL; } - - if (PDCobj_close(obj_yy) < 0) { + if (PDCobj_close(obj_yy) != SUCCEED) { printf("fail to close object obj_yy\n"); - return 1; + return FAIL; } - if (PDCobj_close(obj_zz) < 0) { + if (PDCobj_close(obj_zz) != SUCCEED) { printf("fail to close object obj_zz\n"); - return 1; + return FAIL; } - if (PDCobj_close(obj_pxx) < 0) { + if (PDCobj_close(obj_pxx) != SUCCEED) { printf("fail to close object obj_pxx\n"); - return 1; + return FAIL; } - if (PDCobj_close(obj_pyy) < 0) { + if (PDCobj_close(obj_pyy) != SUCCEED) { printf("fail to close object obj_pyy\n"); - return 1; + return FAIL; } - if (PDCobj_close(obj_pzz) < 0) { + if (PDCobj_close(obj_pzz) != SUCCEED) { printf("fail to close object obj_pzz\n"); - return 1; + return FAIL; } - if (PDCobj_close(obj_id11) < 0) { + if (PDCobj_close(obj_id11) != SUCCEED) { printf("fail to close object obj_id11\n"); - return 1; + return FAIL; } - if (PDCobj_close(obj_id22) < 0) { + if (PDCobj_close(obj_id22) != SUCCEED) { printf("fail to close object obj_id22\n"); - return 1; + return FAIL; } #ifdef ENABLE_MPI MPI_Barrier(MPI_COMM_WORLD); - t0 = MPI_Wtime(); - if (rank == 0) { - printf("Obj close time: %.5e\n", t0 - t1); - } + t1 = MPI_Wtime(); + PDC_get_time_str(cur_time); + if (rank == 0) + printf("[%s] Obj close time: %.5e\n", cur_time, t1 - t0); #endif - if (i != steps - 1) { - sleep(sleeptime); - if (rank == 0) { - printf("Sleep time: %llu.00\n", sleeptime); - } - } } // End for steps PDC_timing_report("write"); - if (PDCprop_close(obj_prop_xx) < 0) { - printf("Fail to close obj property obj_prop_xx\n"); - return 1; - } - if (PDCprop_close(obj_prop_yy) < 0) { - printf("Fail to close obj property obj_prop_yy\n"); - return 1; - } - if (PDCprop_close(obj_prop_zz) < 0) { - printf("Fail to close obj property obj_prop_zz\n"); - return 1; - } - if (PDCprop_close(obj_prop_pxx) < 0) { - printf("Fail to close obj property obj_prop_pxx\n"); - return 1; - } - if (PDCprop_close(obj_prop_pyy) < 0) { - printf("Fail to close obj property obj_prop_pyy\n"); - return 1; - } - if (PDCprop_close(obj_prop_pzz) < 0) { - printf("Fail to close obj property obj_prop_pzz\n"); - return 1; - } - if (PDCprop_close(obj_prop_id11) < 0) { - printf("Fail to close obj property obj_prop_id11\n"); - return 1; - } - if (PDCprop_close(obj_prop_id22) < 0) { - printf("Fail to close obj property obj_prop_id22\n"); - return 1; - } - if (PDCregion_close(region_x) < 0) { - printf("fail to close region region_x\n"); - return 1; - } - if (PDCregion_close(region_y) < 0) { - printf("fail to close region region_y\n"); - return 1; - } - if (PDCregion_close(region_z) < 0) { - printf("fail to close region region_z\n"); - return 1; - } - if (PDCregion_close(region_px) < 0) { - printf("fail to close region region_px\n"); - return 1; - } - if (PDCregion_close(region_py) < 0) { - printf("fail to close region region_py\n"); - return 1; - } - if (PDCobj_close(region_pz) < 0) { - printf("fail to close region region_pz\n"); - return 1; - } - if (PDCobj_close(region_id1) < 0) { - printf("fail to close region region_id1\n"); - return 1; - } - if (PDCobj_close(region_id2) < 0) { - printf("fail to close region region_id2\n"); - return 1; - } - if (PDCregion_close(region_xx) < 0) { - printf("fail to close region region_xx\n"); - return 1; - } - if (PDCregion_close(region_yy) < 0) { - printf("fail to close region region_yy\n"); - return 1; - } - if (PDCregion_close(region_zz) < 0) { - printf("fail to close region region_zz\n"); - return 1; - } - if (PDCregion_close(region_pxx) < 0) { - printf("fail to close region region_pxx\n"); - return 1; - } - if (PDCregion_close(region_pyy) < 0) { - printf("fail to close region region_pyy\n"); - return 1; + if (PDCprop_close(obj_prop_float) != SUCCEED) { + printf("Fail to close obj_prop_float\n"); + return FAIL; } - if (PDCregion_close(region_pzz) < 0) { - printf("fail to close region region_pzz\n"); - return 1; + if (PDCprop_close(obj_prop_int) != SUCCEED) { + printf("Fail to close obj_prop_int\n"); + return FAIL; } - if (PDCobj_close(region_id11) < 0) { - printf("fail to close region region_id11\n"); - return 1; + if (PDCregion_close(region_local) != SUCCEED) { + printf("fail to close local region \n"); + return FAIL; } - if (PDCobj_close(region_id22) < 0) { - printf("fail to close region region_id22\n"); - return 1; + if (PDCobj_close(region_remote) != SUCCEED) { + printf("fail to close remote region\n"); + return FAIL; } - // close a container - if (PDCcont_close(cont_id) < 0) { - printf("fail to close container c1\n"); - return 1; + if (PDCcont_close(cont_id) != SUCCEED) { + printf("fail to close container\n"); + return FAIL; } - // close a container property - if (PDCprop_close(cont_prop) < 0) { - printf("Fail to close property @ line %d\n", __LINE__); - return 1; + if (PDCprop_close(cont_prop) != SUCCEED) { + printf("Fail to close property\n"); + return FAIL; } - if (PDCclose(pdc_id) < 0) { + if (PDCclose(pdc_id) != SUCCEED) { printf("fail to close PDC\n"); - return 1; + return FAIL; } - free(offset); - free(offset_remote); - free(mysize); free(x); free(y); free(z);