diff --git a/src/api/pdc.c b/src/api/pdc.c index 86a3e09eb..332731fff 100644 --- a/src/api/pdc.c +++ b/src/api/pdc.c @@ -36,6 +36,8 @@ #include "pdc_interface.h" #include "pdc_client_connect.h" +#include "pdc_timing.h" + pbool_t err_occurred = FALSE; perr_t PDC_class__close(struct _pdc_class *p); diff --git a/src/api/pdc_client_server_common.c b/src/api/pdc_client_server_common.c index 7fbd6a50f..e548c6132 100644 --- a/src/api/pdc_client_server_common.c +++ b/src/api/pdc_client_server_common.c @@ -2144,7 +2144,7 @@ buf_map_region_release_bulk_transfer_cb(const struct hg_cb_info *hg_cb_info) (remote_reg_info->offset)[2] = (bulk_args->remote_region_nounit).start_2; (remote_reg_info->size)[2] = (bulk_args->remote_region_nounit).count_2; } - + PDC_Server_data_write_out(bulk_args->remote_obj_id, remote_reg_info, bulk_args->data_buf, (bulk_args->in).data_unit); // Perform lock release function diff --git a/src/api/pdc_region.h b/src/api/pdc_region.h index 43aa53a6f..bc1883d03 100644 --- a/src/api/pdc_region.h +++ b/src/api/pdc_region.h @@ -40,6 +40,7 @@ struct pdc_region_info { bool mapping; int registered_op; void *buf; + size_t unit; }; /*********************/ diff --git a/src/server/pdc_server.c b/src/server/pdc_server.c index 9ac1b9ae6..10b572e9f 100644 --- a/src/server/pdc_server.c +++ b/src/server/pdc_server.c @@ -148,6 +148,7 @@ double total_mem_usage_g = 0.0; pdc_data_server_io_list_t *pdc_data_server_read_list_head_g = NULL; pdc_data_server_io_list_t *pdc_data_server_write_list_head_g = NULL; update_storage_meta_list_t *pdc_update_storage_meta_list_head_g = NULL; +extern data_server_region_t *dataserver_region_g; /* @@ -1147,12 +1148,27 @@ perr_t PDC_Server_checkpoint() } } + region_count += n_region; if (n_write_region != n_region) { printf("==PDC_SERVER[%d]: %s - ERROR with number of regions", pdc_server_rank_g, __func__); ret_value = FAIL; goto done; } + // Write storage region info + data_server_region_t *region = NULL; + region = PDC_Server_get_obj_region(elt->obj_id); + if(region) { + DL_COUNT(region->region_storage_head, region_elt, n_region); + fwrite(&n_region, sizeof(int), 1, file); + DL_FOREACH(region->region_storage_head, region_elt) { + fwrite(region_elt, sizeof(region_list_t), 1, file); + } + } + else { + fwrite(&n_region, sizeof(int), 1, file); + } + metadata_size++; region_count += n_region; } @@ -1315,8 +1331,8 @@ perr_t PDC_Server_restart(char *filename) goto done; } - if (n_region == 0) - continue; + /* if (n_region == 0) */ + /* continue; */ total_region += n_region; @@ -1381,6 +1397,20 @@ perr_t PDC_Server_restart(char *filename) DL_APPEND((metadata+i)->storage_region_list_head, region_list); } // For j + + // read storage region info + fread(&n_region, sizeof(int), 1, file); + data_server_region_t *new_obj_reg = (data_server_region_t *)calloc(1, sizeof(struct data_server_region_t)); + DL_APPEND(dataserver_region_g, new_obj_reg); + new_obj_reg->obj_id = (metadata+i)->obj_id; + for (j = 0; j < n_region; j++) { + region_list_t *new_region_list = (region_list_t*)malloc(sizeof(region_list_t)); + fread(new_region_list, sizeof(region_list_t), 1, file); + DL_APPEND(new_obj_reg->region_storage_head, new_region_list); + } + + total_region += n_region; + DL_SORT((metadata+i)->storage_region_list_head, region_cmp); } // For i @@ -1866,7 +1896,6 @@ int main(int argc, char *argv[]) // Exit from the loop, start finalize process #ifndef DISABLE_CHECKPOINT -#else char *tmp_env_char = getenv("PDC_DISABLE_CHECKPOINT"); if (tmp_env_char != NULL && strcmp(tmp_env_char, "TRUE")==0) { if (pdc_server_rank_g == 0) printf("==PDC_SERVER[0]: checkpoint disabled!\n"); diff --git a/src/server/pdc_server_data.c b/src/server/pdc_server_data.c index 530ac0d12..09e0cc454 100644 --- a/src/server/pdc_server_data.c +++ b/src/server/pdc_server_data.c @@ -4330,14 +4330,141 @@ static perr_t PDC_Server_data_io_direct(pdc_access_t io_type, uint64_t obj_id, s FUNC_LEAVE(ret_value); } -perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit) +int PDC_region_cache_register(uint64_t obj_id, const char *buf, size_t buf_size, const uint64_t *offset, const uint64_t *size, int ndim, size_t unit) { + pdc_obj_cache *temp; + struct pdc_region_info *temp2; + pdc_obj_cache *obj_cache = NULL; + int i; + struct pdc_region_info *region_cache; + + for ( i = 0; i < obj_cache_list.obj_cache_size; ++i ) { + if (obj_cache_list.pdc_obj_cache[i].obj_id == obj_id) { + obj_cache = obj_cache_list.pdc_obj_cache + i; + } + } + if (obj_cache == NULL) { + if (obj_cache_list.obj_cache_max_size == 0) { + obj_cache_list.obj_cache_max_size = 512; + obj_cache_list.pdc_obj_cache = (pdc_obj_cache*) malloc(sizeof(pdc_obj_cache) * obj_cache_list.obj_cache_max_size); + } else { + if (obj_cache_list.obj_cache_size == obj_cache_list.obj_cache_max_size) { + obj_cache_list.obj_cache_max_size *= 2; + temp = (pdc_obj_cache*) malloc(sizeof(pdc_obj_cache) * obj_cache_list.obj_cache_max_size); + memcpy(temp, obj_cache_list.pdc_obj_cache, sizeof(pdc_obj_cache) * obj_cache_list.obj_cache_size); + } + } + obj_cache_list.pdc_obj_cache[obj_cache_list.obj_cache_size].region_obj_cache_max_size = 512; + obj_cache_list.pdc_obj_cache[obj_cache_list.obj_cache_size].region_obj_cache_size = 0; + obj_cache_list.pdc_obj_cache[obj_cache_list.obj_cache_size].region_cache = (pdc_obj_cache*) malloc(sizeof(pdc_obj_cache) * obj_cache_list.pdc_obj_cache[obj_cache_list.obj_cache_size].region_obj_cache_max_size); + obj_cache_list.obj_cache_size++; + obj_cache = obj_cache_list.pdc_obj_cache + obj_cache_list.obj_cache_size; + } + if (obj_cache->region_obj_cache_max_size == 0) { + obj_cache->region_obj_cache_max_size = 512; + obj_cache->region_cache = (struct pdc_region_info *) malloc(sizeof(struct pdc_region_info) * obj_cache->region_obj_cache_max_size); + } else { + if (obj_cache->region_obj_cache_max_size == obj_cache->region_obj_cache_size) { + obj_cache->region_obj_cache_max_size *= 2; + temp2 = (struct pdc_region_info*) malloc(sizeof(struct pdc_region_info) * obj_cache->region_obj_cache_max_size); + memcpy(temp2, obj_cache->region_cache, sizeof(struct pdc_region_info) * obj_cache->region_obj_cache_size); + } + } + region_cache = obj_cache->region_cache + obj_cache->region_obj_cache_size; + region_cache->ndim = ndim; + region_cache->offset = (uint64_t*) malloc(sizeof(uint64_t) * ndim); + region_cache->size = (uint64_t*) malloc(sizeof(uint64_t) * ndim); + region_cache->buf = (char*) malloc(sizeof(char) * buf_size); + region_cache->unit = unit; + + memcpy(region_cache->offset, offset, sizeof(uint64_t) * ndim); + memcpy(region_cache->size, size, sizeof(uint64_t) * ndim); + memcpy(region_cache->buf, buf, sizeof(char) * buf_size); + + obj_cache->region_obj_cache_size++; + return 0; +} + +int PDC_region_flush(uint64_t obj_id) { + pdc_obj_cache *obj_cache = NULL; + int i; + struct pdc_region_info *region_cache; + + for ( i = 0; i < obj_cache_list.obj_cache_size; ++i ) { + if (obj_cache_list.pdc_obj_cache[i].obj_id == obj_id) { + obj_cache = obj_cache_list.pdc_obj_cache + i; + } + } + + for ( i = 0; i < obj_cache->region_obj_cache_size; ++i ) { + region_cache = obj_cache->region_cache + i; + PDC_Server_data_write_out(obj_id, region_cache, region_cache->buf, region_cache->unit); + } + return 0; +} + +int PDC_region_fetch(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit) { + pdc_obj_cache *obj_cache = NULL; + int i, flag = 1; + size_t j; + struct pdc_region_info *region_cache = NULL; + + for ( i = 0; i < obj_cache_list.obj_cache_size; ++i ) { + if (obj_cache_list.pdc_obj_cache[i].obj_id == obj_id) { + obj_cache = obj_cache_list.pdc_obj_cache + i; + } + } + + for ( i = 0; i < obj_cache->region_obj_cache_size; ++i ) { + flag = 1; + region_cache = obj_cache->region_cache + i; + for ( j = 0; j < region_info->ndim; ++j ) { + if ( region_info->offset[j] < region_cache->offset[j] || region_info->offset[j] + region_info->size[i] > region_cache->offset[j] + region_cache->size[i] ) { + flag = 0; + } + } + if (flag) { + break; + } else { + region_cache = NULL; + } + } + PDC_region_flush(obj_id); + PDC_Server_data_read_from(obj_id, region_info, buf, unit); + return 0; +} + +int PDC_region_cache_free() { + int i, j; + for ( i = 0; i < obj_cache_list.obj_cache_size; ++i ) { + for ( j = 0; j < obj_cache_list.pdc_obj_cache[i].region_obj_cache_size; ++j ) { + free(obj_cache_list.pdc_obj_cache[i].region_cache[j].offset); + free(obj_cache_list.pdc_obj_cache[i].region_cache[j].size); + free(obj_cache_list.pdc_obj_cache[i].region_cache[j].buf); + } + free(obj_cache_list.pdc_obj_cache[i].region_cache); + } + free(obj_cache_list.pdc_obj_cache); + return 0; +} + +perr_t PDC_Server_data_write_out2(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit) { perr_t ret_value = SUCCEED; - ssize_t write_bytes = -1; + uint64_t write_bytes = -1; data_server_region_t *region = NULL; FUNC_ENTER(NULL); + + // Write 1GB at a time + uint64_t write_size, max_write_size = 1073741824; + if(region_info->ndim >= 1) + write_size = unit*region_info->size[0]; + if(region_info->ndim >= 2) + write_size *= region_info->size[1]; + if(region_info->ndim >= 3) + write_size *= region_info->size[2]; + region = PDC_Server_get_obj_region(obj_id); if(region == NULL) { printf("cannot locate file handle\n"); @@ -4352,12 +4479,13 @@ perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region } region_list_t *storage_region = (region_list_t*)calloc(1, sizeof(region_list_t)); - for (int i = 0; i < region_info->ndim; i++) { + for (size_t i = 0; i < region_info->ndim; i++) { storage_region->start[i] = region_info->offset[i]; storage_region->count[i] = region_info->size[i]; } storage_region->unit_size = unit; storage_region->offset = lseek(region->fd, 0, SEEK_END); + strcpy(storage_region->storage_location, region->storage_location); /* time_t t; */ /* struct tm tm; */ @@ -4371,18 +4499,10 @@ perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region gettimeofday(&pdc_timer_start, 0); #endif - // Write 1GB at a time - uint64_t write_size, max_write_size = 1073741824; - if(region_info->ndim >= 1) - write_size = unit*region_info->size[0]; - if(region_info->ndim >= 2) - write_size *= region_info->size[1]; - if(region_info->ndim >= 3) - write_size *= region_info->size[2]; - write_bytes = 0; while (write_size > max_write_size) { write_bytes += write(region->fd, buf, max_write_size); + buf += max_write_size; write_size -= max_write_size; } write_bytes += write(region->fd, buf, write_size); @@ -4407,6 +4527,32 @@ perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region storage_region->data_size = write_bytes; DL_APPEND(region->region_storage_head, storage_region); + printf("==PDC_SERVER[%d]: write region %llu bytes\n", pdc_server_rank_g, storage_region->data_size); +done: + fflush(stdout); + FUNC_LEAVE(ret_value); +} + +perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit) +{ + perr_t ret_value = SUCCEED; + + FUNC_ENTER(NULL); + + // Write 1GB at a time +/* + uint64_t write_size; + if(region_info->ndim >= 1) + write_size = unit*region_info->size[0]; + if(region_info->ndim >= 2) + write_size *= region_info->size[1]; + if(region_info->ndim >= 3) + write_size *= region_info->size[2]; + + PDC_region_cache_register(obj_id, buf, write_size * unit, region_info->offset, region_info->size, region_info->ndim, unit); +*/ + PDC_Server_data_write_out2(obj_id, region_info, buf, unit); + done: fflush(stdout); FUNC_LEAVE(ret_value); @@ -4444,7 +4590,7 @@ perr_t PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region region_list_t *storage_region = NULL; DL_FOREACH(region->region_storage_head, elt) { flag = 0; - for (int i = 0; i < region_info->ndim; i++) { + for (size_t i = 0; i < region_info->ndim; i++) { if (elt->start[i]> region_info->offset[i] || (elt->start[i]+elt->count[i])< region_info->offset[i]+region_info->size[i]) { flag = 1; break; @@ -6566,6 +6712,7 @@ PDC_Server_query_evaluate_merge_opt(pdc_query_t *query, query_task_t *task, pdc_ } // Skip non-overlap regions with the region constraint + if (region_constraint && region_constraint->ndim > 0) { if (PDC_is_contiguous_region_overlap(region_elt, region_constraint) != 1) continue; diff --git a/src/server/pdc_server_data.h b/src/server/pdc_server_data.h index 1fac1e7e4..6ba01eeb7 100644 --- a/src/server/pdc_server_data.h +++ b/src/server/pdc_server_data.h @@ -309,6 +309,22 @@ extern char *gBinningOption; extern int gen_fastbit_idx_g; extern int use_fastbit_idx_g; +typedef struct { + struct pdc_region_info *region_cache; + uint64_t obj_id; + int region_obj_cache_size; + int region_obj_cache_max_size; + int unit; +} pdc_obj_cache; + +typedef struct { + pdc_obj_cache *pdc_obj_cache; + int obj_cache_size; + int obj_cache_max_size; +} pdc_cache; + +pdc_cache obj_cache_list; + /***************************************/ /* Library-private Function Prototypes */ /***************************************/ diff --git a/src/tests/read_write_col_perf.c b/src/tests/read_write_col_perf.c index 70ecc215e..47d329e2a 100644 --- a/src/tests/read_write_col_perf.c +++ b/src/tests/read_write_col_perf.c @@ -309,6 +309,7 @@ int main(int argc, char **argv) { } } #if PDC_TIMING == 1 + MPI_Barrier(MPI_COMM_WORLD); PDC_timing_report("read"); #endif // close a container