From 13fb9af6bad2bf2ec7718287b68a6a635dc27e54 Mon Sep 17 00:00:00 2001 From: Houjun Tang Date: Mon, 2 Dec 2024 15:39:53 -0500 Subject: [PATCH] EQSIM benchmark code and fixes (#213) * Update getting_started.rst (#184) * Removing gres option for ctest (#182) * Removing gres option for ctest * Removing gres option from scripts * Update check for core --------- Co-authored-by: Hyunju Oh Co-authored-by: Jean Luca Bez * enable cache by default (#187) * Benchmark code for EQSIM data * Committing clang-format changes * Minor adjustments * Committing clang-format changes * Updates * Committing clang-format changes * Change vpicio to use local server partitioning, add some debug prints * Committing clang-format changes * Add metadata query to benchmark code * Committing clang-format changes * Add ZFP compression for read and write * Committing clang-format changes * Add a option to use more ranks to read data so total data of each rank is less than the 4GB chunk limit * Committing clang-format changes * Add a data query code for EQSIM data * Committing clang-format changes * Minor adjustments for the HDF5 read code * Committing clang-format changes * Fix an issue with periodic data flush, minor changes to benchmark code * Committing clang-format changes * fix an issue with 3d read segfault * Committing clang-format changes * Fix compile issue * Update .gitlab-ci.yml * Update sleep time * Replace function * Replace function * Minor updates and doc changes * Committing clang-format changes * Update --------- Co-authored-by: Hyunju Oh Co-authored-by: Hyunju Oh Co-authored-by: Jean Luca Bez Co-authored-by: github-actions --- CMakeLists.txt | 14 +- docs/source/developer-notes.rst | 17 +- src/api/pdc_region/pdc_region_transfer.c | 16 +- src/commons/utils/pdc_timing.c | 1 + src/server/CMakeLists.txt | 10 + src/server/pdc_server.c | 27 +- .../pdc_server_region/pdc_server_data.c | 185 ++++++++- .../pdc_server_region_cache.c | 50 ++- .../pdc_server_region_transfer.c | 3 + src/tests/run_checkpoint_restart_test.sh | 1 + src/tests/vpicio.c | 1 + src/tools/CMakeLists.txt | 4 + src/tools/hdf5_access_eqsim.c | 382 ++++++++++++++++++ src/tools/pdc_access_eqsim.c | 343 ++++++++++++++++ src/tools/pdc_import_eqsim.c | 213 ++++++++++ src/tools/pdc_query_eqsim.c | 241 +++++++++++ 16 files changed, 1442 insertions(+), 66 deletions(-) create mode 100644 src/tools/hdf5_access_eqsim.c create mode 100644 src/tools/pdc_access_eqsim.c create mode 100644 src/tools/pdc_import_eqsim.c create mode 100644 src/tools/pdc_query_eqsim.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 7e59417c..e2375a50 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -388,10 +388,10 @@ endif() option(PDC_SERVER_CACHE "Enable Server Caching." ON) if(PDC_SERVER_CACHE) set(PDC_SERVER_CACHE 1) - set(PDC_SERVER_CACHE_MAX_GB "3" CACHE STRING "Max GB for server cache") - set(PDC_SERVER_CACHE_FLUSH_TIME "30" CACHE STRING "Flush time for server cache") + set(PDC_SERVER_CACHE_MAX_GB "32" CACHE STRING "Max GB for server cache") + set(PDC_SERVER_IDLE_CACHE_FLUSH_TIME "3" CACHE STRING "Idle time to initiate flush from server cache") - add_compile_definitions(PDC_SERVER_CACHE_MAX_GB=${PDC_SERVER_CACHE_MAX_GB} PDC_SERVER_CACHE_FLUSH_TIME=${PDC_SERVER_CACHE_FLUSH_TIME}) + add_compile_definitions(PDC_SERVER_CACHE_MAX_GB=${PDC_SERVER_CACHE_MAX_GB} PDC_SERVER_IDLE_CACHE_FLUSH_TIME=${PDC_SERVER_IDLE_CACHE_FLUSH_TIME}) endif() @@ -487,6 +487,14 @@ if(PDC_ENABLE_SQLITE3) set(ENABLE_SQLITE3 1) endif() +#----------------------------------------------------------------------------- +# ZFP option +#----------------------------------------------------------------------------- +option(PDC_ENABLE_ZFP "Enable ZFP." OFF) +if(PDC_ENABLE_ZFP) + set(ENABLE_ZFP 1) +endif() + # Check availability of symbols #----------------------------------------------------------------------------- check_symbol_exists(malloc_usable_size "malloc.h" HAVE_MALLOC_USABLE_SIZE) diff --git a/docs/source/developer-notes.rst b/docs/source/developer-notes.rst index 7b6f95fe..ef975cb0 100644 --- a/docs/source/developer-notes.rst +++ b/docs/source/developer-notes.rst @@ -333,6 +333,11 @@ Server Nonblocking Control By design, the region transfer request start does not guarantee the finish of data transfer or server I/O. In fact, this function should return to the application as soon as possible. Data transfer and server I/O can occur in the background so that client applications can take advantage of overlapping timings between application computations and PDC data management. +Server Data Cache +--------------------------------------------- + +PDC supports server-side write data cache and is enabled in the CMake option ``PDC_SERVER_CACHE`` by default. Each time the server receives a region writerequest, it will cache the data in the server's memory without writing it to the file system. The server monitors both the total amount of cached data and how long it has not received any I/O requests to determine when to flush the data from cache to the file system. Two additional CMake options ``PDC_SERVER_CACHE_MAX_GB`` and ``PDC_SERVER_IDLE_CACHE_FLUSH_TIME`` can be set to affect the cache flush behavior. When the cached data size reaches the limit or the server is idle longer than the idle time, the flush operation is triggered. With the idle time trigger, when a new I/O request is received during the flush, PDC will stop flushng the next region and reset the timer to avoid interfering with the client's I/O. Setting ``export PDC_SERVER_CACHE_NO_FLUSH=0`` can disable the flush operation and keep the data in cache. + Server Region Transfer Request Start --------------------------------------------- @@ -343,6 +348,11 @@ Then, ``PDC_commit_request`` is called for request registration. This operation Finally, the server RPC returns a finished code to the client so that the client can return to the application immediately. +Server Region Transfer Data Sieving +--------------------------------------------- +When reading a 2D/3D region, PDC server uses data sieving if a subset of a storage region is requested, which would improve the read performance. The entire region is read as a contiguous chunk and the request subset will be extracted before sending the data to the client. Setting ``export PDC_DATA_SIEVING=0`` before running the server will disable this feature. + + Server Region Transfer Request Wait --------------------------------------------- @@ -373,6 +383,11 @@ However, when a new region is written to an object, it is necessary to scan all I/O by region will store repeated bytes when write requests contain overlapping parts. In addition, the region update mechanism generates extra I/O operations. This is one of its disadvantages. Optimization for region search (as R trees) in the future can relieve this problem. +Storage Compression (Prototype) +--------------------------------------------- + +PDC has partial support for storing the compressed data for each storage regions with the ZFP compression library. Currently the compression is hard-coded to the ZFP accuracy mode. + +++++++++++++++++++++++++++++++++++++++++++++ Contributing to PDC project +++++++++++++++++++++++++++++++++++++++++++++ @@ -560,4 +575,4 @@ But if you need to debug the server, you can prepend ``srun`` with ``ddt --conne rm -rf ./pdc_tmp # optional if you need to clean up the PDC tmp directory ddt --connect srun -N 1 -n 4 -c 2 --mem=25600 --cpu_bind=cores ./bin/pdc_server.exe & -We recommend to use 1 node when debugging PDC, but if memory is not sufficient, you can use more nodes. \ No newline at end of file +We recommend to use 1 node when debugging PDC, but if memory is not sufficient, you can use more nodes. diff --git a/src/api/pdc_region/pdc_region_transfer.c b/src/api/pdc_region/pdc_region_transfer.c index 9297796a..1d4f6a71 100644 --- a/src/api/pdc_region/pdc_region_transfer.c +++ b/src/api/pdc_region/pdc_region_transfer.c @@ -1789,25 +1789,13 @@ release_region_buffer(char *buf, uint64_t *obj_dims, int local_ndim, uint64_t *l if (local_ndim == 2) { if (access_type == PDC_READ) { ptr = new_buf; - for (i = 0; i < local_size[0]; ++i) { - memcpy(buf + ((local_offset[0] + i) * obj_dims[1] + local_offset[1]) * unit, ptr, - local_size[1] * unit); - ptr += local_size[1] * unit; - } + memcpy(buf, ptr, local_size[0] * local_size[1] * unit); } } else if (local_ndim == 3) { if (access_type == PDC_READ) { ptr = new_buf; - for (i = 0; i < local_size[0]; ++i) { - for (j = 0; j < local_size[1]; ++j) { - memcpy(buf + ((local_offset[0] + i) * obj_dims[1] * obj_dims[2] + - (local_offset[1] + j) * obj_dims[2] + local_offset[2]) * - unit, - ptr, local_size[2] * unit); - ptr += local_size[2] * unit; - } - } + memcpy(buf, ptr, local_size[0] * local_size[1] * local_size[2] * unit); } } if (bulk_buf_ref) { diff --git a/src/commons/utils/pdc_timing.c b/src/commons/utils/pdc_timing.c index cc8600b4..75080dd0 100644 --- a/src/commons/utils/pdc_timing.c +++ b/src/commons/utils/pdc_timing.c @@ -1,5 +1,6 @@ #include "pdc_timing.h" #include "assert.h" +#include "mpi.h" #ifdef PDC_TIMING static double pdc_base_time; diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 0d7bae54..55034d34 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -17,6 +17,13 @@ if(PDC_ENABLE_SQLITE3) find_package(SQLite3 3.31.0 REQUIRED) endif() +if(PDC_ENABLE_ZFP) + add_definitions(-DENABLE_ZFP=1) + find_package(ZFP REQUIRED) + # find_path(ZFP_INCLUDE_DIR include/zfp.h) +endif() + + include_directories( ${PDC_COMMON_INCLUDE_DIRS} ${PDC_INCLUDES_BUILD_TIME} @@ -40,6 +47,7 @@ include_directories( ${MERCURY_INCLUDE_DIR} ${FASTBIT_INCLUDE_DIR} ${ROCKSDB_INCLUDE_DIR} + ${ZFP_INCLUDE_DIRS} ) add_definitions( -DIS_PDC_SERVER=1 ) @@ -70,6 +78,8 @@ add_library(pdc_server_lib if(PDC_ENABLE_FASTBIT) message(STATUS "Enabled fastbit") target_link_libraries(pdc_server_lib ${MERCURY_LIBRARY} ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES} ${FASTBIT_LIBRARY}/libfastbit.so) +elseif(PDC_ENABLE_ZFP) + target_link_libraries(pdc_server_lib ${MERCURY_LIBRARY} ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES} zfp::zfp) elseif(PDC_ENABLE_ROCKSDB) if(PDC_ENABLE_SQLITE3) target_link_libraries(pdc_server_lib ${MERCURY_LIBRARY} ${PDC_COMMONS_LIBRARIES} -lm -ldl ${PDC_EXT_LIB_DEPENDENCIES} ${ROCKSDB_LIBRARY} SQLite::SQLite3) diff --git a/src/server/pdc_server.c b/src/server/pdc_server.c index 08f05fe1..e39f4f79 100644 --- a/src/server/pdc_server.c +++ b/src/server/pdc_server.c @@ -96,6 +96,7 @@ char ** all_addr_strings_g = NULL; int is_hash_table_init_g = 0; int lustre_stripe_size_mb_g = 16; int lustre_total_ost_g = 0; +int pdc_disable_checkpoint_g = 0; hg_id_t get_remote_metadata_register_id_g; hg_id_t buf_map_server_register_id_g; @@ -719,15 +720,9 @@ PDC_Server_set_close(void) #ifdef PDC_TIMING start = MPI_Wtime(); #endif - char *tmp_env_char = getenv("PDC_DISABLE_CHECKPOINT"); - if (tmp_env_char != NULL && strcmp(tmp_env_char, "TRUE") == 0) { - if (pdc_server_rank_g == 0) { - printf("==PDC_SERVER[0]: checkpoint disabled!\n"); - } - } - else { + if (pdc_disable_checkpoint_g == 0) PDC_Server_checkpoint(); - } + #ifdef PDC_TIMING pdc_server_timings->PDCserver_checkpoint += MPI_Wtime() - start; #endif @@ -1204,7 +1199,8 @@ PDC_Server_recv_shm_cb(const struct hg_cb_info *callback_info) hg_return_t PDC_Server_checkpoint_cb() { - PDC_Server_checkpoint(); + if (pdc_disable_checkpoint_g == 0) + PDC_Server_checkpoint(); return HG_SUCCESS; } @@ -1862,7 +1858,7 @@ PDC_Server_loop(hg_context_t *hg_context) #ifdef PDC_ENABLE_CHECKPOINT checkpoint_interval++; // Avoid calling clock() every operation - if (checkpoint_interval % PDC_CHECKPOINT_CHK_OP_INTERVAL == 0) { + if (pdc_disable_checkpoint_g == 0 && checkpoint_interval % PDC_CHECKPOINT_CHK_OP_INTERVAL == 0) { cur_time = clock(); double elapsed_time = ((double)(cur_time - last_checkpoint_time)) / CLOCKS_PER_SEC; /* fprintf(stderr, "PDC_SERVER: loop elapsed time %.2f\n", elapsed_time); */ @@ -2117,7 +2113,7 @@ PDC_Server_get_env() data_sieving_g = atoi(tmp_env_char); } else { - data_sieving_g = 0; + data_sieving_g = 1; } // Get number of OST per file @@ -2158,7 +2154,7 @@ PDC_Server_get_env() tmp_env_char = getenv("PDC_GEN_HIST"); if (tmp_env_char != NULL) - gen_hist_g = 1; + gen_hist_g = atoi(tmp_env_char); tmp_env_char = getenv("PDC_GEN_FASTBIT_IDX"); if (tmp_env_char != NULL) @@ -2184,6 +2180,13 @@ PDC_Server_get_env() printf("==PDC_SERVER[%d]: using SQLite3 for kvtag\n", pdc_server_rank_g); } + tmp_env_char = getenv("PDC_DISABLE_CHECKPOINT"); + if (tmp_env_char != NULL && strcmp(tmp_env_char, "TRUE") == 0) { + pdc_disable_checkpoint_g = 1; + if (pdc_server_rank_g == 0) + printf("==PDC_SERVER[0]: checkpoint disabled!\n"); + } + if (pdc_server_rank_g == 0) { printf("==PDC_SERVER[%d]: using [%s] as tmp dir, %d OSTs, %d OSTs per data file, %d%% to BB\n", pdc_server_rank_g, pdc_server_tmp_dir_g, lustre_total_ost_g, pdc_nost_per_file_g, diff --git a/src/server/pdc_server_region/pdc_server_data.c b/src/server/pdc_server_region/pdc_server_data.c index ab466aa4..ca01af2c 100644 --- a/src/server/pdc_server_region/pdc_server_data.c +++ b/src/server/pdc_server_region/pdc_server_data.c @@ -44,6 +44,10 @@ #include "mpi.h" #endif +#ifdef ENABLE_ZFP +#include "zfp.h" +#endif + #include "pdc_utlist.h" #include "pdc_public.h" #include "pdc_interface.h" @@ -284,9 +288,7 @@ PDC_Server_get_obj_region(pdcid_t obj_id) { data_server_region_t *ret_value = NULL; data_server_region_t *elt = NULL; - FUNC_ENTER(NULL); - if (dataserver_region_g != NULL) { DL_FOREACH(dataserver_region_g, elt) { @@ -294,6 +296,24 @@ PDC_Server_get_obj_region(pdcid_t obj_id) ret_value = elt; } } + FUNC_LEAVE(ret_value); +} + +pdc_data_server_io_list_t * +PDC_Server_get_obj_region_query(pdcid_t obj_id) +{ + pdc_data_server_io_list_t *ret_value = NULL; + pdc_data_server_io_list_t *elt = NULL; + + FUNC_ENTER(NULL); + + if (pdc_data_server_write_list_head_g != NULL) { + DL_FOREACH(pdc_data_server_write_list_head_g, elt) + { + if (elt->obj_id == obj_id) + ret_value = elt; + } + } FUNC_LEAVE(ret_value); } @@ -4718,7 +4738,37 @@ PDC_Server_posix_write(int fd, void *buf, uint64_t write_size) FUNC_LEAVE(ret_value); } -// No PDC_SERVER_CACHE +#ifdef ENABLE_ZFP +static zfp_field * +_setup_zfp(struct pdc_region_info *region_info, zfp_stream **zfp) +{ + zfp_type type = zfp_type_double; + zfp_field *field; + + if (region_info->unit == 8) + type = zfp_type_double; + else if (region_info->unit == 4) + type = zfp_type_int32; + else + fprintf(stderr, "==PDC_SERVER[%d]: unit has size %u not expected!\n", pdc_server_rank_g, + region_info->unit); + + if (region_info->ndim == 1) + field = zfp_field_1d(region_info->buf, type, region_info->size[0]); + else if (region_info->ndim == 2) + field = zfp_field_2d(region_info->buf, type, region_info->size[0], region_info->size[1]); + else if (region_info->ndim == 3) + field = zfp_field_3d(region_info->buf, type, region_info->size[0], region_info->size[1], + region_info->size[2]); + + *zfp = zfp_stream_open(NULL); + // TODO: precision mode for now + zfp_stream_set_accuracy(*zfp, 0.01); + + return field; +} +#endif + perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit) { @@ -4818,7 +4868,7 @@ PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, // No need to update metadata } else { - + // data_sieving_g = 1 by default, export PDC_DATA_SIEVING=0 to change if (data_sieving_g) { // Create a buffer for loading the entire region. tmp_buf = (char *)malloc(overlap_region->data_size); @@ -4827,7 +4877,8 @@ PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, #endif if (pread(region->fd, tmp_buf, overlap_region->data_size, overlap_region->offset) != (ssize_t)overlap_region->data_size) { - printf("==PDC_SERVER[%d]: pread failed to read enough bytes\n", pdc_server_rank_g); + printf("==PDC_SERVER[%d]: pread failed to read enough bytes %d\n", pdc_server_rank_g, + __LINE__); } #ifdef PDC_TIMING pdc_server_timings->PDCdata_server_read_posix += MPI_Wtime() - start_posix; @@ -5013,7 +5064,44 @@ PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, } if (is_contained == 0) { request_region->offset = lseek(region->fd, 0, SEEK_END); -// printf("posix write for position %d with write size %u\n", 0, (unsigned)write_size); + + // debug + /* fprintf(stderr, "==PDC_SERVER[%d]: %s posix write for position %d with write size %u\n", */ + /* pdc_server_rank_g, __func__, 0, (unsigned)write_size); */ +#ifdef ENABLE_ZFP + zfp_field * field; + zfp_stream *zfp; + size_t bufsize; + void * buffer; + bitstream * stream; + + field = _setup_zfp(region_info, &zfp); + if (field == NULL) + fprintf(stderr, "==PDC_SERVER[%d]: _setup_zfp failed!\n", pdc_server_rank_g); + else { + + bufsize = zfp_stream_maximum_size(zfp, field); + if (bufsize == 0) + fprintf(stderr, "==PDC_SERVER[%d]: zfp_stream_maximum_size returned 0!\n", pdc_server_rank_g); + buffer = malloc(bufsize); + if (buffer == 0) + fprintf(stderr, "==PDC_SERVER[%d]: malloc failed!\n", pdc_server_rank_g); + else { + stream = stream_open(buffer, bufsize); + zfp_stream_set_bit_stream(zfp, stream); + // Compress the data and overwrite the write_size for the following posix write + size_t compress_size = zfp_compress(zfp, field); + fprintf(stderr, "==PDC_SERVER[%d]: zfp compressed size %lu / %llu CR=%.2lf\n", + pdc_server_rank_g, compress_size, write_size, (double)write_size / compress_size); + buf = buffer; + write_size = compress_size; + } + } + zfp_field_free(field); + zfp_stream_close(zfp); + stream_close(stream); +#endif + #ifdef PDC_TIMING start_posix = MPI_Wtime(); #endif @@ -5054,7 +5142,6 @@ PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, FUNC_LEAVE(ret_value); } // End PDC_Server_data_write_out -// No PDC_SERVER_CACHE perr_t PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit) { @@ -5118,7 +5205,8 @@ PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region_info, if (pread(region->fd, buf + (overlap_offset[0] - region_info->offset[0]) * unit, overlap_size[0] * unit, overlap_region->offset + pos) != (ssize_t)(overlap_size[0] * unit)) { - printf("==PDC_SERVER[%d]: pread failed to read enough bytes\n", pdc_server_rank_g); + printf("==PDC_SERVER[%d]: pread failed to read enough bytes %d\n", pdc_server_rank_g, + __LINE__); } #ifdef PDC_TIMING pdc_server_timings->PDCdata_server_read_posix += MPI_Wtime() - start_posix; @@ -5133,18 +5221,87 @@ PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region_info, // No need to update metadata } else { + // data_sieving_g = 1 by default, export PDC_DATA_SIEVING=0 to change if (data_sieving_g) { tmp_buf = (char *)malloc(overlap_region->data_size); #ifdef PDC_TIMING start_posix = MPI_Wtime(); #endif - if (pread(region->fd, tmp_buf, overlap_region->data_size, overlap_region->offset) != - (ssize_t)overlap_region->data_size) { - printf("==PDC_SERVER[%d]: pread failed to read enough bytes\n", pdc_server_rank_g); + // Read up to 1GB at a time + uint64_t read_max = 1 * 1024 * 1024 * 1024llu; + if (overlap_region->data_size > read_max) { + uint64_t buf_off = 0; + uint64_t reg_off = overlap_region->offset; + uint64_t leftover = overlap_region->data_size; + ssize_t read_size = read_max; + + while (leftover > 0) { + /* printf("==PDC_SERVER[%d]: pread %llu, leftover %llu\n", pdc_server_rank_g, + * read_size, leftover); */ + if (pread(region->fd, tmp_buf + buf_off, read_size, reg_off) != read_size) { + printf("==PDC_SERVER[%d]: pread failed to read enough bytes %llu, LINE %d\n", + pdc_server_rank_g, read_size, __LINE__); + } + reg_off += read_size; + buf_off += read_size; + leftover -= read_size; + if (leftover < read_size) + read_size = leftover; + } + } + else { + if (pread(region->fd, tmp_buf, overlap_region->data_size, overlap_region->offset) != + (ssize_t)overlap_region->data_size) { + printf("==PDC_SERVER[%d]: pread failed to read enough bytes %d\n", + pdc_server_rank_g, __LINE__); + } } #ifdef PDC_TIMING pdc_server_timings->PDCdata_server_read_posix += MPI_Wtime() - start_posix; #endif + +#ifdef ENABLE_ZFP + // Uncompress the data + zfp_field * field; + zfp_stream *zfp; + size_t bufsize, decompress_size; + bitstream * stream; + + field = _setup_zfp(region_info, &zfp); + if (field == NULL) + fprintf(stderr, "==PDC_SERVER[%d]: _setup_zfp failed!\n", pdc_server_rank_g); + else { + + if (region_info->ndim >= 1) + decompress_size = region_info->unit * region_info->size[0]; + if (region_info->ndim >= 2) + decompress_size *= region_info->size[1]; + if (region_info->ndim >= 3) + decompress_size *= region_info->size[2]; + if (decompress_size == 0) + fprintf(stderr, "==PDC_SERVER[%d]: zfp_stream_maximum_size returned 0!\n", + pdc_server_rank_g); + + void *decompress_buffer = malloc(decompress_size); + if (decompress_buffer == 0) + fprintf(stderr, "==PDC_SERVER[%d]: malloc failed!\n", pdc_server_rank_g); + else { + stream = stream_open(decompress_buffer, decompress_size); + zfp_stream_set_bit_stream(zfp, stream); + // Decompress the data + decompress_size = zfp_decompress(zfp, field); + if (decompress_size == 0) + fprintf(stderr, "==PDC_SERVER[%d]: zfp_decompress failed!\n", + pdc_server_rank_g); + free(tmp_buf); + tmp_buf = decompress_buffer; + } + } + zfp_field_free(field); + zfp_stream_close(zfp); + stream_close(stream); +#endif + memcpy_overlap_subregion(region_info->ndim, unit, tmp_buf, overlap_region->start, overlap_region->count, buf, region_info->offset, region_info->size, overlap_offset, overlap_size); @@ -7789,7 +7946,7 @@ attach_local_storage_region_to_query(pdc_query_t *query) { /* pdc_metadata_t *meta; */ - data_server_region_t *obj_reg; + pdc_data_server_io_list_t *obj_reg; if (NULL == query->constraint) { printf("==PDC_SERVER[%d]: %s - query->constraint is NULL!\n", pdc_server_rank_g, __func__); @@ -7805,12 +7962,12 @@ attach_local_storage_region_to_query(pdc_query_t *query) /* } */ /* query->constraint->storage_region_list_head = meta->storage_region_list_head; */ - obj_reg = PDC_Server_get_obj_region(query->constraint->obj_id); + obj_reg = PDC_Server_get_obj_region_query(query->constraint->obj_id); if (obj_reg == NULL) { printf("==PDC_SERVER[%d]: %s - cannot find region from object!\n", pdc_server_rank_g, __func__); } else - query->constraint->storage_region_list_head = obj_reg->region_storage_head; + query->constraint->storage_region_list_head = obj_reg->region_list_head; return SUCCEED; } diff --git a/src/server/pdc_server_region/pdc_server_region_cache.c b/src/server/pdc_server_region/pdc_server_region_cache.c index 98cd5758..c150076e 100644 --- a/src/server/pdc_server_region/pdc_server_region_cache.c +++ b/src/server/pdc_server_region/pdc_server_region_cache.c @@ -6,16 +6,10 @@ #ifdef PDC_SERVER_CACHE -#ifdef PDC_SERVER_CACHE_MAX_SIZE -#define MAX_CACHE_SIZE PDC_SERVER_CACHE_MAX_GB * 1024 * 1024 * 1024 +#ifdef PDC_SERVER_CACHE_MAX_GB +#define MAX_CACHE_SIZE_GB PDC_SERVER_CACHE_MAX_GB #else -#define MAX_CACHE_SIZE 34359738368 -#endif - -#ifdef PDC_SERVER_CACHE_FLUSH_TIME -#define PDC_CACHE_FLUSH_TIME_INT PDC_SERVER_CACHE_FLUSH_TIME -#else -#define PDC_CACHE_FLUSH_TIME_INT 30 +#define MAX_CACHE_SIZE_GB 32 #endif #ifdef PDC_SERVER_IDLE_CACHE_FLUSH_TIME @@ -52,6 +46,7 @@ static int pdc_idle_flush_time_g; int PDC_region_server_cache_init() { + int server_rank = 0; char *p; pdc_recycle_close_flag = 0; @@ -60,10 +55,12 @@ PDC_region_server_cache_init() total_cache_size = 0; p = getenv("PDC_SERVER_CACHE_MAX_SIZE"); - if (p != NULL) - maximum_cache_size = atol(p); - else - maximum_cache_size = MAX_CACHE_SIZE; + if (p != NULL) { + maximum_cache_size = atol(p) * 1024llu * 1024llu * 1024llu; + } + else { + maximum_cache_size = MAX_CACHE_SIZE_GB * 1024llu * 1024llu * 1024llu; + } p = getenv("PDC_SERVER_IDLE_CACHE_FLUSH_TIME"); if (p != NULL) @@ -71,6 +68,12 @@ PDC_region_server_cache_init() else pdc_idle_flush_time_g = PDC_IDLE_CACHE_FLUSH_TIME_INT; +#ifdef ENABLE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &server_rank); +#endif + if (server_rank == 0) + fprintf(stderr, "==PDC_SERVER[%d]: max cache size: %llu\n", server_rank, maximum_cache_size); + obj_cache_list = NULL; obj_cache_list_end = NULL; @@ -500,6 +503,8 @@ PDC_region_cache_register(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dim pthread_mutex_unlock(&pdc_obj_cache_list_mutex); + gettimeofday(&(obj_cache->timestamp), NULL); + if (total_cache_size > maximum_cache_size) { printf("==PDC_SERVER[%d]: server cache full %.1f / %.1f MB, will flush to storage\n", PDC_get_rank(), total_cache_size / 1048576.0, maximum_cache_size / 1048576.0); @@ -509,8 +514,6 @@ PDC_region_cache_register(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dim // printf("created cache region at offset %llu, buf size %llu, unit = %ld, ndim = %ld, obj_id = %llu\n", // offset[0], buf_size, unit, ndim, (long long unsigned)obj_cache->obj_id); - gettimeofday(&(obj_cache->timestamp), NULL); - return 0; } @@ -721,7 +724,7 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache, int pdc_region_cache * region_cache_iter, *region_cache_temp; struct pdc_region_info * region_cache_info; uint64_t write_size = 0; - char ** buf, **new_buf, *buf_ptr = NULL; + char ** buf, **new_buf, *buf_ptr = NULL, *env_char; uint64_t * start, *end, *new_start, *new_end; int merged_request_size = 0; uint64_t unit; @@ -730,6 +733,11 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache, int #ifdef PDC_TIMING double start_time = MPI_Wtime(); #endif + env_char = getenv("PDC_SERVER_CACHE_NO_FLUSH"); + if (env_char && atoi(env_char) != 0) { + fprintf(stderr, "==PDC_SERVER[%d]: flushed disabled\n", PDC_get_rank()); + return 0; + } /* PDC_get_time_str(cur_time); */ /* printf("%s ==PDC_SERVER[%d.%d]: enter %s\n", cur_time, PDC_get_rank(), flag, __func__); */ @@ -758,6 +766,7 @@ PDC_region_cache_flush_by_pointer(uint64_t obj_id, pdc_obj_cache *obj_cache, int buf[i] = obj_regions[i]->buf; } free(obj_regions); + // Merge adjacent regions // printf("checkpoint @ line %d\n", __LINE__); merge_requests(start, end, obj_cache->region_cache_size, buf, &new_start, &new_end, &new_buf, unit, @@ -913,16 +922,12 @@ PDC_region_cache_clock_cycle(void *ptr) pdc_obj_cache *obj_cache, *obj_cache_iter; struct timeval current_time; struct timeval finish_time; - int nflush = 0; - double flush_frequency_s = PDC_CACHE_FLUSH_TIME_INT, elapsed_time; + int nflush = 0; + double elapsed_time; time_t t; struct tm * tm; char cur_time[64]; - char *p = getenv("PDC_SERVER_CACHE_FLUSH_FREQUENCY_S"); - if (p != NULL) - flush_frequency_s = atoi(p); - if (ptr == NULL) { obj_cache_iter = NULL; } @@ -982,6 +987,7 @@ PDC_region_cache_clock_cycle(void *ptr) pthread_mutex_unlock(&pdc_cache_mutex); break; } + pthread_mutex_unlock(&pdc_cache_mutex); usleep(10000); } diff --git a/src/server/pdc_server_region/pdc_server_region_transfer.c b/src/server/pdc_server_region/pdc_server_region_transfer.c index 0fb4d5a6..0361178c 100644 --- a/src/server/pdc_server_region/pdc_server_region_transfer.c +++ b/src/server/pdc_server_region/pdc_server_region_transfer.c @@ -314,6 +314,9 @@ PDC_Server_transfer_request_io(uint64_t obj_id, int obj_ndim, const uint64_t *ob PDC_get_rank(), PDC_get_rank()); PDC_mkdir(storage_location); + /* fprintf(stderr, "Rank %d, write to offset %llu, size %llu\n", server_rank, region_info->offset[0], + * region_info->size[0]); */ + fd = open(storage_location, O_RDWR | O_CREAT, 0666); if (region_info->ndim == 1) { // printf("server I/O checkpoint 1D\n"); diff --git a/src/tests/run_checkpoint_restart_test.sh b/src/tests/run_checkpoint_restart_test.sh index cb44f5bb..fa138f3e 100755 --- a/src/tests/run_checkpoint_restart_test.sh +++ b/src/tests/run_checkpoint_restart_test.sh @@ -5,6 +5,7 @@ # Cori CI needs srun even for serial tests run_cmd="" + if [[ "$SUPERCOMPUTER" == "perlmutter" ]]; then run_cmd="srun -n 1 --mem=25600 --cpu_bind=cores --overlap" fi diff --git a/src/tests/vpicio.c b/src/tests/vpicio.c index e7bd51f0..1fa71718 100644 --- a/src/tests/vpicio.c +++ b/src/tests/vpicio.c @@ -125,6 +125,7 @@ main(int argc, char **argv) // create an object property obj_prop_xx = PDCprop_create(PDC_OBJ_CREATE, pdc_id); + PDCprop_set_obj_transfer_region_type(obj_prop_xx, PDC_REGION_LOCAL); PDCprop_set_obj_dims(obj_prop_xx, 1, dims); PDCprop_set_obj_type(obj_prop_xx, PDC_FLOAT); PDCprop_set_obj_time_step(obj_prop_xx, 0); diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index c8eb78f7..b6834809 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -98,6 +98,10 @@ set(PROGRAMS pdc_import pdc_export pdc_ls + pdc_import_eqsim + pdc_access_eqsim + pdc_query_eqsim + hdf5_access_eqsim ) add_library(cjson cjson/cJSON.c) diff --git a/src/tools/hdf5_access_eqsim.c b/src/tools/hdf5_access_eqsim.c new file mode 100644 index 00000000..ca916b73 --- /dev/null +++ b/src/tools/hdf5_access_eqsim.c @@ -0,0 +1,382 @@ +#include +#include +#include +#include +#include +#include + +#include "mpi.h" + +#include "hdf5.h" + +int +main(int argc, char **argv) +{ + + hid_t file, grp, dset, fapl, dxpl, dapl, dspace, mspace, meta_dset, meta_dspace, meta_mspace; + herr_t status; + + int i, j, r, round = 1, count, total_count, rank = 0, nproc = 1, ssi_downsample, rec_downsample, + batchsize, iter, opensees_size, use_chunk_cache = 0; + int start_x[4096], start_y[4096], devide_factor = 1, readt; + hsize_t offset[4], size[4], stride[4], data_i; + hsize_t dims[4] = {4634, 19201, 12801, 1}, chunk_size[4] = {400, 600, 400, 1}; + // dims is 12x, 32x, 32x of chunk size + char * fname, *dname = "vel_0 ijk layout"; + double *data = NULL, t0, t1, t2, data_max, data_min, *ssi_data = NULL, *rec_data = NULL, + *opensees_data = NULL, meta_value[4]; + + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nproc); + + fname = argv[1]; + if (argc > 2) + round = atoi(argv[2]); + if (argc > 3) + use_chunk_cache = atoi(argv[3]); + if (argc > 4) + devide_factor = atoi(argv[4]); + + // Data size is more than 4GB per rank, need to reduce the first direction read size by + // a factor of 4 + readt = ceil(1.0 * dims[0] / chunk_size[0]) * chunk_size[0] / devide_factor; + + if (rank == 0) + fprintf(stderr, "Round %d, use chunk cache %d, devide factor %d\n", round, use_chunk_cache, + devide_factor); + + fapl = H5Pcreate(H5P_FILE_ACCESS); + H5Pset_fapl_mpio(fapl, MPI_COMM_WORLD, MPI_INFO_NULL); + + dxpl = H5Pcreate(H5P_DATASET_XFER); + H5Pset_dxpl_mpio(dxpl, H5FD_MPIO_COLLECTIVE); + + dapl = H5Pcreate(H5P_DATASET_ACCESS); + if (use_chunk_cache > 0) + H5Pset_chunk_cache(dapl, 1228800, 4294967295, 1); + + file = H5Fopen(fname, H5F_ACC_RDONLY, fapl); + if (file < 0) + fprintf(stderr, "Failed to open file [%s]\n", fname); + + // Assign chunks to each rank + count = 0; + for (i = 0; i < dims[1] / chunk_size[1]; i++) { + for (j = 0; j < dims[2] / chunk_size[2]; j++) { + start_x[count] = i; + start_y[count] = j; + count++; + } + } + + //=============Metadata Query========= + // Each rank read 4 values, simulating a lat lon range access + meta_dset = H5Dopen(file, "z coordinates", dapl); + meta_dspace = H5Dget_space(meta_dset); + + offset[0] = start_x[rank / devide_factor] * 2; + offset[1] = start_y[rank / devide_factor] * 2; + offset[2] = 0; + + size[0] = 2; + size[1] = 2; + size[2] = 1; + + H5Sselect_hyperslab(meta_dspace, H5S_SELECT_SET, offset, NULL, size, NULL); + + meta_mspace = H5Screate_simple(3, size, NULL); + + for (r = 0; r < round; r++) { + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + + H5Dread(meta_dset, H5T_NATIVE_DOUBLE, meta_mspace, meta_dspace, dxpl, meta_value); + + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: read metadata took %.4lf\n", r, t1 - t0); + } + + H5Dclose(meta_dset); + H5Sclose(meta_mspace); + H5Sclose(meta_dspace); + + //=============PATTERN 1=============== + // Read entire chunks, can be used for caching + offset[0] = (rank % devide_factor) * readt; + offset[1] = chunk_size[1] * start_x[rank / devide_factor]; + offset[2] = chunk_size[2] * start_y[rank / devide_factor]; + offset[3] = 0; + + if (rank % devide_factor == devide_factor - 1) + size[0] = dims[0] - readt * (devide_factor - 1); + else + size[0] = readt; + size[1] = chunk_size[1]; + size[2] = chunk_size[2]; + size[3] = 1; + + mspace = H5Screate_simple(4, size, NULL); + + data = (double *)malloc(sizeof(double) * size[0] * size[1] * size[2]); + + if (nproc <= 16) + fprintf(stderr, "Rank %d: offset %llu, %llu, %llu size %llu, %llu, %llu\n", rank, offset[0], + offset[1], offset[2], size[0], size[1], size[2]); + + for (r = 0; r < round; r++) { + if (r == round - 1 && use_chunk_cache > 0) + H5Pset_chunk_cache(dapl, 1228800, 4294967295, 1); + + dset = H5Dopen(file, dname, dapl); + dspace = H5Dget_space(dset); + H5Sget_simple_extent_dims(dspace, dims, NULL); + H5Sselect_hyperslab(dspace, H5S_SELECT_SET, offset, NULL, size, NULL); + + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + + H5Dread(dset, H5T_NATIVE_DOUBLE, mspace, dspace, dxpl, data); + + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d, Read from HDF5 took %.4lf\n", r, t1 - t0); + + if (r != round - 1) { + // leave dset open for following patterns + H5Dclose(dset); + H5Sclose(dspace); + } + } + + H5Sclose(mspace); + + // Get some statistics of the data + int cnt[10] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + for (r = 0; r < round; r++) { + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + for (data_i = 0; data_i < size[0] * size[1] * size[2]; data_i++) { + if (fabs(data[data_i]) > 0.1) + cnt[0]++; + if (fabs(data[data_i]) > 0.2) + cnt[1]++; + if (fabs(data[data_i]) > 0.3) + cnt[2]++; + if (fabs(data[data_i]) > 0.4) + cnt[3]++; + if (fabs(data[data_i]) > 0.5) + cnt[4]++; + if (fabs(data[data_i]) > 0.5) + cnt[5]++; + if (fabs(data[data_i]) > 0.7) + cnt[6]++; + if (fabs(data[data_i]) > 0.8) + cnt[7]++; + if (fabs(data[data_i]) > 0.9) + cnt[8]++; + if (fabs(data[data_i]) > 1.0) + cnt[9]++; + } + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + + if (rank == 0) + fprintf(stderr, "Round %d: Scanning data took %.4lf\n", r, t1 - t0); + } + + fprintf(stderr, "Rank %d: %d, %d, %d, %d, %d, %d, %d, %d, %d, %d\n", rank, cnt[0], cnt[1], cnt[2], cnt[3], + cnt[4], cnt[5], cnt[6], cnt[7], cnt[8], cnt[9]); + MPI_Barrier(MPI_COMM_WORLD); + + //=============PATTERN 2=============== + // OpenSees access pattern: 1 rank read subregion 200x200m (32 grids) + opensees_size = 32; // 32 * 6.25m = 200m + + offset[0] = (rank % devide_factor) * readt; + offset[1] = chunk_size[1] * start_x[rank / devide_factor]; + offset[2] = chunk_size[2] * start_y[rank / devide_factor]; + offset[3] = 0; + if (rank % devide_factor == devide_factor - 1) + size[0] = dims[0] - readt * (devide_factor - 1); + else + size[0] = readt; + size[1] = opensees_size; + size[2] = opensees_size; + size[3] = 1; + + mspace = H5Screate_simple(4, size, NULL); + + if (nproc <= 16) + fprintf(stderr, "Rank %d: offset %llu, %llu, %llu size %llu, %llu, %llu\n", rank, offset[0], + offset[1], offset[2], size[0], size[1], size[2]); + + if (rank == 0) + opensees_data = (double *)malloc(sizeof(double) * dims[0] * opensees_size * opensees_size); + + for (r = 0; r < round; r++) { + + if (rank == 0) + H5Sselect_hyperslab(dspace, H5S_SELECT_SET, offset, NULL, size, NULL); + else { + H5Sselect_none(mspace); + H5Sselect_none(dspace); + } + + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + + H5Dread(dset, H5T_NATIVE_DOUBLE, mspace, dspace, dxpl, opensees_data); + + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: rank 0 read OpenSees 200x200m data took %.6lf\n", r, t1 - t0); + + } // End for round + + H5Sclose(mspace); + free(opensees_data); + + //=============PATTERN 3=============== + // Generating movie: all rank downsample in space to 156.25x156.25m (downsample factor 25) + ssi_downsample = 25; + + offset[0] = (rank % devide_factor) * readt; + offset[1] = start_x[rank / devide_factor] * chunk_size[1]; + offset[2] = start_y[rank / devide_factor] * chunk_size[2]; + offset[3] = 0; + if (rank % devide_factor == devide_factor - 1) + size[0] = dims[0] - readt * (devide_factor - 1); + else + size[0] = readt; + size[1] = chunk_size[1] / ssi_downsample; + size[2] = chunk_size[2] / ssi_downsample; + size[3] = 1; + stride[0] = 1; + stride[1] = ssi_downsample; + stride[2] = ssi_downsample; + stride[3] = 1; + + mspace = H5Screate_simple(4, size, NULL); + + batchsize = chunk_size[1] * chunk_size[2] / ssi_downsample / ssi_downsample; + ssi_data = (double *)malloc(sizeof(double) * dims[0] * batchsize); + + for (r = 0; r < round; r++) { + H5Sselect_hyperslab(dspace, H5S_SELECT_SET, offset, stride, size, NULL); + + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + + H5Dread(dset, H5T_NATIVE_DOUBLE, mspace, dspace, dxpl, ssi_data); + + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: all ranks read ssi_downsample data took %.6lf\n", r, t1 - t0); + + } // End for round + + H5Sclose(mspace); + free(ssi_data); + + //=============PATTERN 4=============== + // Building response: all rank downsample in space to every 1250m (downsample factor 200) + rec_downsample = 200; + + offset[0] = (rank % devide_factor) * readt; + offset[1] = start_x[rank / devide_factor] * chunk_size[1]; + offset[2] = start_y[rank / devide_factor] * chunk_size[2]; + offset[3] = 0; + if (rank % devide_factor == devide_factor - 1) + size[0] = dims[0] - readt * (devide_factor - 1); + else + size[0] = readt; + size[1] = chunk_size[1] / rec_downsample; + size[2] = chunk_size[2] / rec_downsample; + size[3] = 1; + stride[0] = 1; + stride[1] = rec_downsample; + stride[2] = rec_downsample; + stride[3] = 1; + + mspace = H5Screate_simple(4, size, NULL); + + batchsize = chunk_size[1] * chunk_size[2] / rec_downsample / rec_downsample; + rec_data = (double *)malloc(sizeof(double) * dims[0] * batchsize); + + for (r = 0; r < round; r++) { + H5Sselect_hyperslab(dspace, H5S_SELECT_SET, offset, stride, size, NULL); + + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + + H5Dread(dset, H5T_NATIVE_DOUBLE, mspace, dspace, dxpl, rec_data); + + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: all ranks read rec_downsample data took %.6lf\n", r, t1 - t0); + + } // End for round + + H5Sclose(mspace); + + //=============PATTERN 5=============== + // Single rank singele time history access + + offset[0] = (rank % devide_factor) * readt; + offset[1] = start_x[rank / devide_factor] * chunk_size[1]; + offset[2] = start_y[rank / devide_factor] * chunk_size[2]; + offset[3] = 0; + if (rank % devide_factor == devide_factor - 1) + size[0] = dims[0] - readt * (devide_factor - 1); + else + size[0] = readt; + size[1] = 1; + size[2] = 1; + size[3] = 1; + + mspace = H5Screate_simple(4, size, NULL); + + rec_data = (double *)malloc(sizeof(double) * dims[0] * batchsize); + + for (r = 0; r < round; r++) { + if (rank == 0) + H5Sselect_hyperslab(dspace, H5S_SELECT_SET, offset, NULL, size, NULL); + else { + H5Sselect_none(mspace); + H5Sselect_none(dspace); + } + + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + + H5Dread(dset, H5T_NATIVE_DOUBLE, mspace, dspace, dxpl, rec_data); + + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: rank 0 read 1 time-history took %.6lf\n", r, t1 - t0); + + } // End for round + + H5Sclose(mspace); + free(rec_data); + + H5Sclose(dspace); + + H5Pclose(dapl); + H5Pclose(fapl); + H5Pclose(dxpl); + H5Dclose(dset); + H5Fclose(file); + H5close(); + + MPI_Finalize(); + return 0; +} diff --git a/src/tools/pdc_access_eqsim.c b/src/tools/pdc_access_eqsim.c new file mode 100644 index 00000000..aa97eda7 --- /dev/null +++ b/src/tools/pdc_access_eqsim.c @@ -0,0 +1,343 @@ +#include +#include +#include +#include +#include + +#ifdef ENABLE_MPI +#include "mpi.h" +#endif + +#include "pdc.h" + +int +main(int argc, char **argv) +{ + pdcid_t pdc, cont, obj_prop, obj, local_reg, remote_reg, transfer_req, *transfer_batch; + int i, j, r, count, total_count, rank, nproc, ssi_downsample, rec_downsample, batchsize, iter, + opensees_size, round = 1; + int start_x[4096], start_y[4096]; + uint64_t pdc_dims[3], pdc_offset[3], pdc_size[3], pdc_local_offset[3], pdc_local_size[3]; + psize_t value_size; + // 12x, 32x, 32x + char * fname, tag_name[128]; + uint64_t dims[4] = {4634, 19201, 12801, 1}, chunk_size[4] = {400, 600, 400, 1}; + double * data = NULL, t0, t1, t2, data_max, data_min, *ssi_data = NULL, *rec_data = NULL, + *opensees_data = NULL, *tag_value = NULL; + pdc_var_type_t value_type; + +#ifdef ENABLE_MPI + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nproc); +#endif + + if (argc > 1) + round = atoi(argv[1]); + + // Assign chunks to each rank + count = 0; + for (i = 0; i < dims[1] / chunk_size[1]; i++) { + for (j = 0; j < dims[2] / chunk_size[2]; j++) { + start_x[count] = i; + start_y[count] = j; + count++; + } + } + + pdc = PDCinit("pdc"); + + obj = PDCobj_open_col("run1", pdc); + if (obj <= 0) + fprintf(stderr, "Fail to open object @ line %d!\n", __LINE__); + + /* //=============PATTERN 1=============== */ + /* // Read everything */ + /* transfer_req = PDCregion_transfer_create(data, PDC_READ, obj, local_reg, remote_reg); */ + /* PDCregion_transfer_start(transfer_req); */ + /* PDCregion_transfer_wait(transfer_req); */ + /* PDCregion_transfer_close(transfer_req); */ + + /* #ifdef ENABLE_MPI */ + /* MPI_Barrier(MPI_COMM_WORLD); */ + /* t1 = MPI_Wtime(); */ + /* if (rank == 0) */ + /* fprintf(stderr, "Read whole chunk data from server took %.6lf\n", t1 - t0); */ + /* #endif */ + + /* data_max = -1; */ + /* data_min = 1; */ + /* for (i = 0; i < pdc_local_size[0]*pdc_local_size[1]*pdc_local_size[2]; i++) { */ + /* if (data[i] > data_max) */ + /* data_max = data[i]; */ + /* else if (data[i] < data_min) */ + /* data_min = data[i]; */ + /* } */ + /* fprintf(stderr, "Rank %d, chunk min/max: %lf, %lf\n", rank, data_min, data_max); */ + + /* PDCregion_close(remote_reg); */ + /* PDCregion_close(local_reg); */ + + /* free(data); */ + + //=============PATTERN 2=============== + // OpenSees access pattern: 1 rank read subregion 200x200m (32 grids) + opensees_size = 32; // 32 * 6.25m = 200m + pdc_local_offset[0] = 0; + pdc_local_offset[1] = 0; + pdc_local_offset[2] = 0; + pdc_local_size[0] = dims[0]; + pdc_local_size[1] = opensees_size; + pdc_local_size[2] = opensees_size; + local_reg = PDCregion_create(3, pdc_local_offset, pdc_local_size); + + pdc_offset[0] = 0; + pdc_offset[1] = start_x[rank] * chunk_size[1]; + pdc_offset[2] = start_y[rank] * chunk_size[2]; + pdc_size[0] = dims[0]; + pdc_size[1] = opensees_size; + pdc_size[2] = opensees_size; + remote_reg = PDCregion_create(3, pdc_offset, pdc_size); + + if (nproc <= 16) + fprintf(stderr, "Rank %d: offset %llu, %llu, %llu size %llu, %llu, %llu\n", rank, pdc_offset[0], + pdc_offset[1], pdc_offset[2], pdc_size[0], pdc_size[1], pdc_size[2]); + + // Tag retrieval + sprintf(tag_name, "%llu-%llu\n", pdc_offset[1], pdc_offset[2]); + for (r = 0; r < round; r++) { + tag_value = NULL; +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); +#endif + PDCobj_get_tag(obj, tag_name, (void **)&tag_value, &value_type, &value_size); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: tag retrival query took %.6lf\n", r, t1 - t0); +#endif + if (value_size != 4 * sizeof(double)) + fprintf(stderr, "Error: Round %d: tag retrival result size %llu / %llu \n", r, value_size, + 4 * sizeof(double)); + if (tag_value) + free(tag_value); + } + + if (rank == 0) + opensees_data = (double *)malloc(sizeof(double) * dims[0] * opensees_size * opensees_size); + + for (r = 0; r < round; r++) { +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); +#endif + + if (rank == 0) { + transfer_req = PDCregion_transfer_create(opensees_data, PDC_READ, obj, local_reg, remote_reg); + PDCregion_transfer_start(transfer_req); + PDCregion_transfer_wait(transfer_req); + } + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: rank 0 read OpenSees 200x200m data from server took %.6lf\n", r, + t1 - t0); +#endif + + if (rank == 0) + PDCregion_transfer_close(transfer_req); + } // End for round + + if (rank == 0) { + PDCregion_close(remote_reg); + PDCregion_close(local_reg); + } + + //=============PATTERN 3=============== + // Generating movie: all rank downsample in space to 156.25x156.25m (downsample factor 25) + ssi_downsample = 25; + pdc_local_offset[0] = 0; + pdc_local_offset[1] = 0; + pdc_local_offset[2] = 0; + pdc_local_size[0] = dims[0]; + pdc_local_size[1] = 1; + pdc_local_size[2] = 1; + local_reg = PDCregion_create(3, pdc_local_offset, pdc_local_size); + + pdc_size[0] = dims[0]; + pdc_size[1] = 1; + pdc_size[2] = 1; + + batchsize = chunk_size[1] * chunk_size[2] / ssi_downsample / ssi_downsample; + transfer_batch = (pdcid_t *)malloc(sizeof(pdcid_t) * batchsize); + ssi_data = (double *)malloc(sizeof(double) * dims[0] * batchsize); + + for (r = 0; r < round; r++) { +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); +#endif + + iter = 0; + for (i = 0; i < chunk_size[1] / ssi_downsample; i++) { + for (j = 0; j < chunk_size[2] / ssi_downsample; j++) { + pdc_offset[0] = 0; + pdc_offset[1] = i * ssi_downsample + start_x[rank] * chunk_size[1]; + pdc_offset[2] = j * ssi_downsample + start_y[rank] * chunk_size[2]; + remote_reg = PDCregion_create(3, pdc_offset, pdc_size); + + transfer_batch[iter] = PDCregion_transfer_create(&ssi_data[dims[0] * iter], PDC_READ, obj, + local_reg, remote_reg); + PDCregion_close(remote_reg); + iter++; + } + } + + PDCregion_transfer_start_all(transfer_batch, batchsize); + PDCregion_transfer_wait_all(transfer_batch, batchsize); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: all ranks read ssi_downsample data from server took %.6lf\n", r, + t1 - t0); +#endif + + for (i = 0; i < batchsize; i++) + PDCregion_transfer_close(transfer_batch[i]); + } // End for round + + PDCregion_close(local_reg); + free(ssi_data); + + //=============PATTERN 4=============== + // Building response: all rank downsample in space to every 1250m (downsample factor 200) + rec_downsample = 200; + pdc_local_offset[0] = 0; + pdc_local_offset[1] = 0; + pdc_local_offset[2] = 0; + pdc_local_size[0] = dims[0]; + pdc_local_size[1] = 1; + pdc_local_size[2] = 1; + local_reg = PDCregion_create(3, pdc_local_offset, pdc_local_size); + + pdc_size[0] = dims[0]; + pdc_size[1] = 1; + pdc_size[2] = 1; + + batchsize = chunk_size[1] * chunk_size[2] / rec_downsample / rec_downsample; + transfer_batch = (pdcid_t *)malloc(sizeof(pdcid_t) * batchsize); + rec_data = (double *)malloc(sizeof(double) * dims[0] * batchsize); + + for (r = 0; r < round; r++) { +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); +#endif + + iter = 0; + for (i = 0; i < chunk_size[1] / rec_downsample; i++) { + for (j = 0; j < chunk_size[2] / rec_downsample; j++) { + pdc_offset[0] = 0; + pdc_offset[1] = i * rec_downsample + start_x[rank] * chunk_size[1]; + pdc_offset[2] = j * rec_downsample + start_y[rank] * chunk_size[2]; + remote_reg = PDCregion_create(3, pdc_offset, pdc_size); + + transfer_batch[iter] = PDCregion_transfer_create(&rec_data[dims[0] * iter], PDC_READ, obj, + local_reg, remote_reg); + PDCregion_close(remote_reg); + iter++; + } + } + + PDCregion_transfer_start_all(transfer_batch, batchsize); + PDCregion_transfer_wait_all(transfer_batch, batchsize); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: all ranks read rec_downsample data from server took %.6lf\n", r, + t1 - t0); +#endif + + for (i = 0; i < batchsize; i++) + PDCregion_transfer_close(transfer_batch[i]); + } // End for round + + PDCregion_close(local_reg); + + //=============PATTERN 5=============== + // Single rank singele time history access + pdc_offset[0] = 0; + pdc_offset[1] = chunk_size[1] / 2 + start_x[rank] * chunk_size[1]; + pdc_offset[2] = chunk_size[2] / 2 + start_y[rank] * chunk_size[2]; + pdc_size[0] = dims[0]; + pdc_size[1] = 1; + pdc_size[2] = 1; + remote_reg = PDCregion_create(3, pdc_offset, pdc_size); + + pdc_local_offset[0] = 0; + pdc_local_offset[1] = 0; + pdc_local_offset[2] = 0; + pdc_local_size[0] = dims[0]; + pdc_local_size[1] = 1; + pdc_local_size[2] = 1; + local_reg = PDCregion_create(3, pdc_local_offset, pdc_local_size); + + for (r = 0; r < round; r++) { +#ifdef ENABLE_MPI + t0 = MPI_Wtime(); +#endif + + if (rank == 0) { + transfer_req = PDCregion_transfer_create(rec_data, PDC_READ, obj, local_reg, remote_reg); + PDCregion_transfer_start(transfer_req); + PDCregion_transfer_wait(transfer_req); + PDCregion_transfer_close(transfer_req); + } + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Round %d: rank 0 read single time series from server took %.6lf\n", r, t1 - t0); +#endif + + /* if (rank == 0) { */ + /* data_max = -1; */ + /* data_min = 1; */ + /* for (i = 0; i < pdc_local_size[0]; i++) { */ + /* if (rec_data[i] > data_max) */ + /* data_max = rec_data[i]; */ + /* else if (rec_data[i] < data_min) */ + /* data_min = rec_data[i]; */ + /* } */ + /* fprintf(stderr, "Round %d: rank %d single series min/max: %lf, %lf\n", r, rank, data_min, + * data_max); */ + /* } */ + + } // End for round + + PDCregion_close(remote_reg); + PDCregion_close(local_reg); + + free(rec_data); + + if (PDCobj_close(obj) < 0) + fprintf(stderr, "fail to close object\n"); + + if (PDCclose(pdc) < 0) + fprintf(stderr, "fail to close PDC\n"); + +#ifdef ENABLE_MPI + MPI_Finalize(); +#endif + return 0; +} diff --git a/src/tools/pdc_import_eqsim.c b/src/tools/pdc_import_eqsim.c new file mode 100644 index 00000000..2854e430 --- /dev/null +++ b/src/tools/pdc_import_eqsim.c @@ -0,0 +1,213 @@ +#include +#include +#include +#include +#include + +#ifdef ENABLE_MPI +#include "mpi.h" +#endif + +#include "hdf5.h" +#include "pdc.h" + +int +main(int argc, char **argv) +{ + pdcid_t pdc, cont_prop, cont, obj_prop, obj, local_reg, remote_reg, transfer_req, *transfer_batch; + + hid_t file, grp, dset, fapl, dxpl, dspace, mspace; + herr_t status; + + int i, j, count, total_count, rank, nproc, ssi_downsample, rec_downsample, batchsize, iter, opensees_size; + int start_x[4096], start_y[4096], max_start_x = 0, max_start_y = 0; + hsize_t offset[4], size[4], local_offset[4], local_size[4]; + hsize_t dims[4] = {4634, 19201, 12801, 1}, chunk_size[4] = {400, 600, 400, 1}; + uint64_t pdc_dims[3], pdc_offset[3], pdc_size[3], pdc_local_offset[3], pdc_local_size[3]; + uint32_t value_size; + // 12x, 32x, 32x + char * fname, *dname = "vel_0 ijk layout", tag_name[128]; + double *data = NULL, t0, t1, t2, data_max, data_min, *ssi_data = NULL, *rec_data = NULL, + *opensees_data = NULL, tag_value[4]; + +#ifdef ENABLE_MPI + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nproc); +#endif + + fname = argv[1]; + + fapl = H5Pcreate(H5P_FILE_ACCESS); + H5Pset_fapl_mpio(fapl, MPI_COMM_WORLD, MPI_INFO_NULL); + + dxpl = H5Pcreate(H5P_DATASET_XFER); + H5Pset_dxpl_mpio(dxpl, H5FD_MPIO_COLLECTIVE); + + file = H5Fopen(fname, H5F_ACC_RDONLY, fapl); + if (file < 0) + fprintf(stderr, "Failed to open file [%s]\n", fname); + + dset = H5Dopen(file, dname, H5P_DEFAULT); + dspace = H5Dget_space(dset); + H5Sget_simple_extent_dims(dspace, dims, NULL); + + // Assign chunks to each rank + count = 0; + for (i = 0; i < dims[1] / chunk_size[1]; i++) { + for (j = 0; j < dims[2] / chunk_size[2]; j++) { + start_x[count] = i; + start_y[count] = j; + count++; + } + } + + offset[0] = 0; + offset[1] = chunk_size[1] * start_x[rank]; + offset[2] = chunk_size[2] * start_y[rank]; + offset[3] = 0; + + /* size[0] = chunk_size[0]; */ + size[0] = dims[0]; + size[1] = chunk_size[1]; + size[2] = chunk_size[2]; + size[3] = 1; + + H5Sselect_hyperslab(dspace, H5S_SELECT_SET, offset, NULL, size, NULL); + + local_offset[0] = 0; + local_offset[1] = 0; + local_offset[2] = 0; + local_offset[3] = 0; + + /* local_size[0] = chunk_size[0]; */ + local_size[0] = dims[0]; + local_size[1] = chunk_size[1]; + local_size[2] = chunk_size[2]; + local_size[3] = 1; + + mspace = H5Screate_simple(4, local_size, NULL); + + data = (double *)malloc(sizeof(double) * local_size[0] * local_size[1] * local_size[2]); + + if (nproc <= 16) + fprintf(stderr, "Rank %d: offset %llu, %llu, %llu size %llu, %llu, %llu\n", rank, offset[0], + offset[1], offset[2], size[0], size[1], size[2]); + +#ifdef ENABLE_MPI + t0 = MPI_Wtime(); +#endif + + H5Dread(dset, H5T_NATIVE_DOUBLE, mspace, dspace, dxpl, data); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Read from HDF5 took %.4lf\n", t1 - t0); +#endif + + H5Sclose(mspace); + H5Sclose(dspace); + H5Pclose(fapl); + H5Pclose(dxpl); + H5Dclose(dset); + H5Fclose(file); + H5close(); + + // End of HDF5 operations + + for (i = 0; i < nproc; i++) { + if (start_x[i] > max_start_x) + max_start_x = start_x[i]; + if (start_y[i] > max_start_y) + max_start_y = start_y[i]; + } + pdc_dims[0] = dims[0]; + pdc_dims[1] = max_start_x + chunk_size[1]; + pdc_dims[2] = max_start_y + chunk_size[2]; + fprintf(stderr, "Rank %d: create obj dims %llu %llu %llu\n", rank, pdc_dims[0], pdc_dims[1], pdc_dims[2]); + + for (i = 0; i < 3; i++) { + pdc_offset[i] = (uint64_t)offset[i]; + pdc_size[i] = (uint64_t)size[i]; + pdc_local_offset[i] = (uint64_t)local_offset[i]; + pdc_local_size[i] = (uint64_t)local_size[i]; + } + + pdc = PDCinit("pdc"); + + cont_prop = PDCprop_create(PDC_CONT_CREATE, pdc); + cont = PDCcont_create("ssioutput", cont_prop); + obj_prop = PDCprop_create(PDC_OBJ_CREATE, pdc); + + PDCprop_set_obj_dims(obj_prop, 3, pdc_dims); + PDCprop_set_obj_type(obj_prop, PDC_DOUBLE); + PDCprop_set_obj_time_step(obj_prop, 0); + PDCprop_set_obj_user_id(obj_prop, getuid()); + PDCprop_set_obj_app_name(obj_prop, "EQSIM"); + PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); + + obj = PDCobj_create_mpi(cont, "run1", obj_prop, 0, MPI_COMM_WORLD); + /* obj = PDCobj_create(cont, "run1", obj_prop); */ + if (obj <= 0) + fprintf(stderr, "Fail to create object @ line %d!\n", __LINE__); + + remote_reg = PDCregion_create(3, pdc_offset, pdc_size); + + local_reg = PDCregion_create(3, pdc_local_offset, pdc_local_size); + + // Create a tag for each region, using its x/y offset as name + sprintf(tag_name, "%llu-%llu\n", pdc_offset[1], pdc_offset[2]); + // Dummy value + tag_value[0] = rank * 0.0; + tag_value[1] = rank * 1.0; + tag_value[2] = rank * 10.0; + tag_value[3] = rank * 11.0; + value_size = 4 * sizeof(double); + + if (PDCobj_put_tag(obj, tag_name, tag_value, PDC_DOUBLE, value_size) < 0) + fprintf(stderr, "Rank %d fail to put tag @ line %d!\n", rank, __LINE__); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); +#endif + + transfer_req = PDCregion_transfer_create(data, PDC_WRITE, obj, local_reg, remote_reg); + PDCregion_transfer_start(transfer_req); + PDCregion_transfer_wait(transfer_req); + PDCregion_transfer_close(transfer_req); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Write data to server took %.4lf\n", t1 - t0); +#endif + + free(data); + + PDCregion_close(remote_reg); + PDCregion_close(local_reg); + + if (PDCobj_close(obj) < 0) + fprintf(stderr, "fail to close object\n"); + + if (PDCcont_close(cont) < 0) + fprintf(stderr, "fail to close container c1\n"); + + if (PDCprop_close(obj_prop) < 0) + fprintf(stderr, "Fail to close property @ line %d\n", __LINE__); + + if (PDCprop_close(cont_prop) < 0) + fprintf(stderr, "Fail to close property @ line %d\n", __LINE__); + + if (PDCclose(pdc) < 0) + fprintf(stderr, "fail to close PDC\n"); + +#ifdef ENABLE_MPI + MPI_Finalize(); +#endif + return 0; +} diff --git a/src/tools/pdc_query_eqsim.c b/src/tools/pdc_query_eqsim.c new file mode 100644 index 00000000..29918544 --- /dev/null +++ b/src/tools/pdc_query_eqsim.c @@ -0,0 +1,241 @@ +#include +#include +#include +#include +#include + +#ifdef ENABLE_MPI +#include "mpi.h" +#endif + +#include "hdf5.h" +#include "pdc.h" +#include "pdc_client_server_common.h" + +int +main(int argc, char **argv) +{ + pdcid_t pdc, cont_prop, cont, obj_prop, obj, local_reg, remote_reg, transfer_req, *transfer_batch; + + hid_t file, grp, dset, fapl, dxpl, dspace, mspace; + herr_t status; + + int i, j, count, total_count, rank, nproc, ssi_downsample, rec_downsample, batchsize, iter, opensees_size; + int start_x[4096], start_y[4096]; + hsize_t offset[4], size[4], local_offset[4], local_size[4]; + hsize_t dims[4] = {4634, 19201, 12801, 1}, chunk_size[4] = {400, 600, 400, 1}; + uint64_t pdc_dims[3], pdc_offset[3], pdc_size[3], pdc_local_offset[3], pdc_local_size[3]; + uint32_t value_size; + // 12x, 32x, 32x + char * fname, *dname = "vel_0 ijk layout", tag_name[128]; + double *data = NULL, t0, t1, t2, data_max, data_min, *ssi_data = NULL, *rec_data = NULL, + *opensees_data = NULL, tag_value[4]; + +#ifdef ENABLE_MPI + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nproc); +#endif + + fname = argv[1]; + + fapl = H5Pcreate(H5P_FILE_ACCESS); + H5Pset_fapl_mpio(fapl, MPI_COMM_WORLD, MPI_INFO_NULL); + + dxpl = H5Pcreate(H5P_DATASET_XFER); + H5Pset_dxpl_mpio(dxpl, H5FD_MPIO_COLLECTIVE); + + file = H5Fopen(fname, H5F_ACC_RDONLY, fapl); + if (file < 0) + fprintf(stderr, "Failed to open file [%s]\n", fname); + + dset = H5Dopen(file, dname, H5P_DEFAULT); + dspace = H5Dget_space(dset); + H5Sget_simple_extent_dims(dspace, dims, NULL); + + // Assign chunks to each rank + count = 0; + for (i = 0; i < dims[1] / chunk_size[1]; i++) { + for (j = 0; j < dims[2] / chunk_size[2]; j++) { + start_x[count] = i; + start_y[count] = j; + count++; + } + } + + offset[0] = 0; + offset[1] = chunk_size[1] * start_x[rank]; + offset[2] = chunk_size[2] * start_y[rank]; + offset[3] = 0; + + /* size[0] = chunk_size[0]; */ + size[0] = dims[0]; + size[1] = chunk_size[1]; + size[2] = chunk_size[2]; + size[3] = 1; + + H5Sselect_hyperslab(dspace, H5S_SELECT_SET, offset, NULL, size, NULL); + + local_offset[0] = 0; + local_offset[1] = 0; + local_offset[2] = 0; + local_offset[3] = 0; + + /* local_size[0] = chunk_size[0]; */ + local_size[0] = dims[0]; + local_size[1] = chunk_size[1]; + local_size[2] = chunk_size[2]; + local_size[3] = 1; + + mspace = H5Screate_simple(4, local_size, NULL); + + data = (double *)malloc(sizeof(double) * local_size[0] * local_size[1] * local_size[2]); + + if (nproc <= 16) + fprintf(stderr, "Rank %d: offset %llu, %llu, %llu size %llu, %llu, %llu\n", rank, offset[0], + offset[1], offset[2], size[0], size[1], size[2]); + +#ifdef ENABLE_MPI + t0 = MPI_Wtime(); +#endif + + H5Dread(dset, H5T_NATIVE_DOUBLE, mspace, dspace, dxpl, data); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Read from HDF5 took %.4lf\n", t1 - t0); +#endif + + H5Sclose(mspace); + H5Sclose(dspace); + H5Pclose(fapl); + H5Pclose(dxpl); + H5Dclose(dset); + H5Fclose(file); + H5close(); + + // End of HDF5 operations + + for (i = 0; i < 3; i++) { + pdc_dims[i] = PDC_SIZE_UNLIMITED; + pdc_offset[i] = (uint64_t)offset[i]; + pdc_size[i] = (uint64_t)size[i]; + pdc_local_offset[i] = (uint64_t)local_offset[i]; + pdc_local_size[i] = (uint64_t)local_size[i]; + } + + pdc = PDCinit("pdc"); + + cont_prop = PDCprop_create(PDC_CONT_CREATE, pdc); + cont = PDCcont_create("ssioutput", cont_prop); + obj_prop = PDCprop_create(PDC_OBJ_CREATE, pdc); + + PDCprop_set_obj_dims(obj_prop, 3, pdc_dims); + PDCprop_set_obj_type(obj_prop, PDC_DOUBLE); + PDCprop_set_obj_time_step(obj_prop, 0); + PDCprop_set_obj_user_id(obj_prop, getuid()); + PDCprop_set_obj_app_name(obj_prop, "EQSIM"); + /* PDCprop_set_obj_transfer_region_type(obj_prop, PDC_REGION_LOCAL); */ + + obj = PDCobj_create_mpi(cont, "run1", obj_prop, 0, MPI_COMM_WORLD); + /* obj = PDCobj_create(cont, "run1", obj_prop); */ + if (obj <= 0) + fprintf(stderr, "Fail to create object @ line %d!\n", __LINE__); + + remote_reg = PDCregion_create(3, pdc_offset, pdc_size); + + local_reg = PDCregion_create(3, pdc_local_offset, pdc_local_size); + + // Create a tag for each region, using its x/y offset as name + sprintf(tag_name, "%llu-%llu\n", pdc_offset[1], pdc_offset[2]); + // Dummy value + tag_value[0] = rank * 0.0; + tag_value[1] = rank * 1.0; + tag_value[2] = rank * 10.0; + tag_value[3] = rank * 11.0; + value_size = 4 * sizeof(double); + + if (PDCobj_put_tag(obj, tag_name, tag_value, PDC_DOUBLE, value_size) < 0) + fprintf(stderr, "Rank %d fail to put tag @ line %d!\n", rank, __LINE__); + + // Query the created object + pdc_metadata_t *metadata; + uint32_t metadata_server_id; + PDC_Client_query_metadata_name_timestep("run1", 0, &metadata, &metadata_server_id); + if (metadata == NULL || metadata->obj_id == 0) { + printf("Error with metadata!\n"); + } + + int ndim = 3; + struct pdc_region_info region; + region.ndim = ndim; + region.offset = pdc_offset; + region.size = pdc_size; + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); +#endif + + PDC_Client_write(metadata, ®ion, data); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Write data to server took %.4lf\n", t1 - t0); +#endif + + // Construct query constraints + double query_val = 0.9; + pdc_query_t *q = PDCquery_create(obj, PDC_GT, PDC_INT, &query_val); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); +#endif + + pdc_selection_t sel; + + if (rank == 0) + PDCquery_get_selection(q, &sel); + +#ifdef ENABLE_MPI + MPI_Barrier(MPI_COMM_WORLD); + t1 = MPI_Wtime(); + if (rank == 0) + fprintf(stderr, "Query data took %.4lf\n", t1 - t0); +#endif + + /* PDCselection_print(&sel); */ + + PDCquery_free_all(q); + PDCselection_free(&sel); + + free(data); + + PDCregion_close(remote_reg); + PDCregion_close(local_reg); + + if (PDCobj_close(obj) < 0) + fprintf(stderr, "fail to close object\n"); + + if (PDCcont_close(cont) < 0) + fprintf(stderr, "fail to close container c1\n"); + + if (PDCprop_close(obj_prop) < 0) + fprintf(stderr, "Fail to close property @ line %d\n", __LINE__); + + if (PDCprop_close(cont_prop) < 0) + fprintf(stderr, "Fail to close property @ line %d\n", __LINE__); + + if (PDCclose(pdc) < 0) + fprintf(stderr, "fail to close PDC\n"); + +#ifdef ENABLE_MPI + MPI_Finalize(); +#endif + return 0; +}