Skip to content

Commit

Permalink
Merge pull request #19 from hpc-io/develop
Browse files Browse the repository at this point in the history
Checkpoint for implementations in develop branch
  • Loading branch information
houjun authored May 19, 2021
2 parents 132d1f5 + f27393d commit 3dc8534
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/api/pdc.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "pdc_interface.h"
#include "pdc_client_connect.h"

#include "pdc_timing.h"

pbool_t err_occurred = FALSE;

perr_t PDC_class__close(struct _pdc_class *p);
Expand Down
2 changes: 1 addition & 1 deletion src/api/pdc_client_server_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -2144,7 +2144,7 @@ buf_map_region_release_bulk_transfer_cb(const struct hg_cb_info *hg_cb_info)
(remote_reg_info->offset)[2] = (bulk_args->remote_region_nounit).start_2;
(remote_reg_info->size)[2] = (bulk_args->remote_region_nounit).count_2;
}

PDC_Server_data_write_out(bulk_args->remote_obj_id, remote_reg_info, bulk_args->data_buf, (bulk_args->in).data_unit);

// Perform lock release function
Expand Down
1 change: 1 addition & 0 deletions src/api/pdc_region.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct pdc_region_info {
bool mapping;
int registered_op;
void *buf;
size_t unit;
};

/*********************/
Expand Down
35 changes: 32 additions & 3 deletions src/server/pdc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ double total_mem_usage_g = 0.0;
pdc_data_server_io_list_t *pdc_data_server_read_list_head_g = NULL;
pdc_data_server_io_list_t *pdc_data_server_write_list_head_g = NULL;
update_storage_meta_list_t *pdc_update_storage_meta_list_head_g = NULL;
extern data_server_region_t *dataserver_region_g;


/*
Expand Down Expand Up @@ -1147,12 +1148,27 @@ perr_t PDC_Server_checkpoint()
}
}

region_count += n_region;
if (n_write_region != n_region) {
printf("==PDC_SERVER[%d]: %s - ERROR with number of regions", pdc_server_rank_g, __func__);
ret_value = FAIL;
goto done;
}

// Write storage region info
data_server_region_t *region = NULL;
region = PDC_Server_get_obj_region(elt->obj_id);
if(region) {
DL_COUNT(region->region_storage_head, region_elt, n_region);
fwrite(&n_region, sizeof(int), 1, file);
DL_FOREACH(region->region_storage_head, region_elt) {
fwrite(region_elt, sizeof(region_list_t), 1, file);
}
}
else {
fwrite(&n_region, sizeof(int), 1, file);
}

metadata_size++;
region_count += n_region;
}
Expand Down Expand Up @@ -1315,8 +1331,8 @@ perr_t PDC_Server_restart(char *filename)
goto done;
}

if (n_region == 0)
continue;
/* if (n_region == 0) */
/* continue; */

total_region += n_region;

Expand Down Expand Up @@ -1381,6 +1397,20 @@ perr_t PDC_Server_restart(char *filename)

DL_APPEND((metadata+i)->storage_region_list_head, region_list);
} // For j

// read storage region info
fread(&n_region, sizeof(int), 1, file);
data_server_region_t *new_obj_reg = (data_server_region_t *)calloc(1, sizeof(struct data_server_region_t));
DL_APPEND(dataserver_region_g, new_obj_reg);
new_obj_reg->obj_id = (metadata+i)->obj_id;
for (j = 0; j < n_region; j++) {
region_list_t *new_region_list = (region_list_t*)malloc(sizeof(region_list_t));
fread(new_region_list, sizeof(region_list_t), 1, file);
DL_APPEND(new_obj_reg->region_storage_head, new_region_list);
}

total_region += n_region;

DL_SORT((metadata+i)->storage_region_list_head, region_cmp);
} // For i

Expand Down Expand Up @@ -1866,7 +1896,6 @@ int main(int argc, char *argv[])

// Exit from the loop, start finalize process
#ifndef DISABLE_CHECKPOINT
#else
char *tmp_env_char = getenv("PDC_DISABLE_CHECKPOINT");
if (tmp_env_char != NULL && strcmp(tmp_env_char, "TRUE")==0) {
if (pdc_server_rank_g == 0) printf("==PDC_SERVER[0]: checkpoint disabled!\n");
Expand Down
173 changes: 160 additions & 13 deletions src/server/pdc_server_data.c
Original file line number Diff line number Diff line change
Expand Up @@ -4330,14 +4330,141 @@ static perr_t PDC_Server_data_io_direct(pdc_access_t io_type, uint64_t obj_id, s
FUNC_LEAVE(ret_value);
}

perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit)
int PDC_region_cache_register(uint64_t obj_id, const char *buf, size_t buf_size, const uint64_t *offset, const uint64_t *size, int ndim, size_t unit) {
pdc_obj_cache *temp;
struct pdc_region_info *temp2;
pdc_obj_cache *obj_cache = NULL;
int i;
struct pdc_region_info *region_cache;

for ( i = 0; i < obj_cache_list.obj_cache_size; ++i ) {
if (obj_cache_list.pdc_obj_cache[i].obj_id == obj_id) {
obj_cache = obj_cache_list.pdc_obj_cache + i;
}
}
if (obj_cache == NULL) {
if (obj_cache_list.obj_cache_max_size == 0) {
obj_cache_list.obj_cache_max_size = 512;
obj_cache_list.pdc_obj_cache = (pdc_obj_cache*) malloc(sizeof(pdc_obj_cache) * obj_cache_list.obj_cache_max_size);
} else {
if (obj_cache_list.obj_cache_size == obj_cache_list.obj_cache_max_size) {
obj_cache_list.obj_cache_max_size *= 2;
temp = (pdc_obj_cache*) malloc(sizeof(pdc_obj_cache) * obj_cache_list.obj_cache_max_size);
memcpy(temp, obj_cache_list.pdc_obj_cache, sizeof(pdc_obj_cache) * obj_cache_list.obj_cache_size);
}
}
obj_cache_list.pdc_obj_cache[obj_cache_list.obj_cache_size].region_obj_cache_max_size = 512;
obj_cache_list.pdc_obj_cache[obj_cache_list.obj_cache_size].region_obj_cache_size = 0;
obj_cache_list.pdc_obj_cache[obj_cache_list.obj_cache_size].region_cache = (pdc_obj_cache*) malloc(sizeof(pdc_obj_cache) * obj_cache_list.pdc_obj_cache[obj_cache_list.obj_cache_size].region_obj_cache_max_size);
obj_cache_list.obj_cache_size++;
obj_cache = obj_cache_list.pdc_obj_cache + obj_cache_list.obj_cache_size;
}
if (obj_cache->region_obj_cache_max_size == 0) {
obj_cache->region_obj_cache_max_size = 512;
obj_cache->region_cache = (struct pdc_region_info *) malloc(sizeof(struct pdc_region_info) * obj_cache->region_obj_cache_max_size);
} else {
if (obj_cache->region_obj_cache_max_size == obj_cache->region_obj_cache_size) {
obj_cache->region_obj_cache_max_size *= 2;
temp2 = (struct pdc_region_info*) malloc(sizeof(struct pdc_region_info) * obj_cache->region_obj_cache_max_size);
memcpy(temp2, obj_cache->region_cache, sizeof(struct pdc_region_info) * obj_cache->region_obj_cache_size);
}
}
region_cache = obj_cache->region_cache + obj_cache->region_obj_cache_size;
region_cache->ndim = ndim;
region_cache->offset = (uint64_t*) malloc(sizeof(uint64_t) * ndim);
region_cache->size = (uint64_t*) malloc(sizeof(uint64_t) * ndim);
region_cache->buf = (char*) malloc(sizeof(char) * buf_size);
region_cache->unit = unit;

memcpy(region_cache->offset, offset, sizeof(uint64_t) * ndim);
memcpy(region_cache->size, size, sizeof(uint64_t) * ndim);
memcpy(region_cache->buf, buf, sizeof(char) * buf_size);

obj_cache->region_obj_cache_size++;
return 0;
}

int PDC_region_flush(uint64_t obj_id) {
pdc_obj_cache *obj_cache = NULL;
int i;
struct pdc_region_info *region_cache;

for ( i = 0; i < obj_cache_list.obj_cache_size; ++i ) {
if (obj_cache_list.pdc_obj_cache[i].obj_id == obj_id) {
obj_cache = obj_cache_list.pdc_obj_cache + i;
}
}

for ( i = 0; i < obj_cache->region_obj_cache_size; ++i ) {
region_cache = obj_cache->region_cache + i;
PDC_Server_data_write_out(obj_id, region_cache, region_cache->buf, region_cache->unit);
}
return 0;
}

int PDC_region_fetch(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit) {
pdc_obj_cache *obj_cache = NULL;
int i, flag = 1;
size_t j;
struct pdc_region_info *region_cache = NULL;

for ( i = 0; i < obj_cache_list.obj_cache_size; ++i ) {
if (obj_cache_list.pdc_obj_cache[i].obj_id == obj_id) {
obj_cache = obj_cache_list.pdc_obj_cache + i;
}
}

for ( i = 0; i < obj_cache->region_obj_cache_size; ++i ) {
flag = 1;
region_cache = obj_cache->region_cache + i;
for ( j = 0; j < region_info->ndim; ++j ) {
if ( region_info->offset[j] < region_cache->offset[j] || region_info->offset[j] + region_info->size[i] > region_cache->offset[j] + region_cache->size[i] ) {
flag = 0;
}
}
if (flag) {
break;
} else {
region_cache = NULL;
}
}
PDC_region_flush(obj_id);
PDC_Server_data_read_from(obj_id, region_info, buf, unit);
return 0;
}

int PDC_region_cache_free() {
int i, j;
for ( i = 0; i < obj_cache_list.obj_cache_size; ++i ) {
for ( j = 0; j < obj_cache_list.pdc_obj_cache[i].region_obj_cache_size; ++j ) {
free(obj_cache_list.pdc_obj_cache[i].region_cache[j].offset);
free(obj_cache_list.pdc_obj_cache[i].region_cache[j].size);
free(obj_cache_list.pdc_obj_cache[i].region_cache[j].buf);
}
free(obj_cache_list.pdc_obj_cache[i].region_cache);
}
free(obj_cache_list.pdc_obj_cache);
return 0;
}

perr_t PDC_Server_data_write_out2(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit)
{
perr_t ret_value = SUCCEED;
ssize_t write_bytes = -1;
uint64_t write_bytes = -1;
data_server_region_t *region = NULL;

FUNC_ENTER(NULL);


// Write 1GB at a time
uint64_t write_size, max_write_size = 1073741824;
if(region_info->ndim >= 1)
write_size = unit*region_info->size[0];
if(region_info->ndim >= 2)
write_size *= region_info->size[1];
if(region_info->ndim >= 3)
write_size *= region_info->size[2];

region = PDC_Server_get_obj_region(obj_id);
if(region == NULL) {
printf("cannot locate file handle\n");
Expand All @@ -4352,12 +4479,13 @@ perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region
}

region_list_t *storage_region = (region_list_t*)calloc(1, sizeof(region_list_t));
for (int i = 0; i < region_info->ndim; i++) {
for (size_t i = 0; i < region_info->ndim; i++) {
storage_region->start[i] = region_info->offset[i];
storage_region->count[i] = region_info->size[i];
}
storage_region->unit_size = unit;
storage_region->offset = lseek(region->fd, 0, SEEK_END);
strcpy(storage_region->storage_location, region->storage_location);

/* time_t t; */
/* struct tm tm; */
Expand All @@ -4371,18 +4499,10 @@ perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region
gettimeofday(&pdc_timer_start, 0);
#endif

// Write 1GB at a time
uint64_t write_size, max_write_size = 1073741824;
if(region_info->ndim >= 1)
write_size = unit*region_info->size[0];
if(region_info->ndim >= 2)
write_size *= region_info->size[1];
if(region_info->ndim >= 3)
write_size *= region_info->size[2];

write_bytes = 0;
while (write_size > max_write_size) {
write_bytes += write(region->fd, buf, max_write_size);
buf += max_write_size;
write_size -= max_write_size;
}
write_bytes += write(region->fd, buf, write_size);
Expand All @@ -4407,6 +4527,32 @@ perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region
storage_region->data_size = write_bytes;
DL_APPEND(region->region_storage_head, storage_region);

printf("==PDC_SERVER[%d]: write region %llu bytes\n", pdc_server_rank_g, storage_region->data_size);
done:
fflush(stdout);
FUNC_LEAVE(ret_value);
}

perr_t PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, void *buf, size_t unit)
{
perr_t ret_value = SUCCEED;

FUNC_ENTER(NULL);

// Write 1GB at a time
/*
uint64_t write_size;
if(region_info->ndim >= 1)
write_size = unit*region_info->size[0];
if(region_info->ndim >= 2)
write_size *= region_info->size[1];
if(region_info->ndim >= 3)
write_size *= region_info->size[2];
PDC_region_cache_register(obj_id, buf, write_size * unit, region_info->offset, region_info->size, region_info->ndim, unit);
*/
PDC_Server_data_write_out2(obj_id, region_info, buf, unit);

done:
fflush(stdout);
FUNC_LEAVE(ret_value);
Expand Down Expand Up @@ -4444,7 +4590,7 @@ perr_t PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region
region_list_t *storage_region = NULL;
DL_FOREACH(region->region_storage_head, elt) {
flag = 0;
for (int i = 0; i < region_info->ndim; i++) {
for (size_t i = 0; i < region_info->ndim; i++) {
if (elt->start[i]> region_info->offset[i] || (elt->start[i]+elt->count[i])< region_info->offset[i]+region_info->size[i]) {
flag = 1;
break;
Expand Down Expand Up @@ -6566,6 +6712,7 @@ PDC_Server_query_evaluate_merge_opt(pdc_query_t *query, query_task_t *task, pdc_
}

// Skip non-overlap regions with the region constraint

if (region_constraint && region_constraint->ndim > 0) {
if (PDC_is_contiguous_region_overlap(region_elt, region_constraint) != 1)
continue;
Expand Down
16 changes: 16 additions & 0 deletions src/server/pdc_server_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,22 @@ extern char *gBinningOption;
extern int gen_fastbit_idx_g;
extern int use_fastbit_idx_g;

typedef struct {
struct pdc_region_info *region_cache;
uint64_t obj_id;
int region_obj_cache_size;
int region_obj_cache_max_size;
int unit;
} pdc_obj_cache;

typedef struct {
pdc_obj_cache *pdc_obj_cache;
int obj_cache_size;
int obj_cache_max_size;
} pdc_cache;

pdc_cache obj_cache_list;

/***************************************/
/* Library-private Function Prototypes */
/***************************************/
Expand Down
1 change: 1 addition & 0 deletions src/tests/read_write_col_perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ int main(int argc, char **argv) {
}
}
#if PDC_TIMING == 1
MPI_Barrier(MPI_COMM_WORLD);
PDC_timing_report("read");
#endif
// close a container
Expand Down

0 comments on commit 3dc8534

Please sign in to comment.