-
Notifications
You must be signed in to change notification settings - Fork 2
/
WorkerUtil.h
76 lines (56 loc) · 2.78 KB
/
WorkerUtil.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#ifndef _WORKER_UTIL_H_
#define _WORKER_UTIL_H_
#include "QueryPlanner.h"
#include "Main.h"
//int createResultMessage(char **message, int worker_id, char *results[], short num_results);
//void parseResultMessage(char *message, short *num_results, char **results[], char **result_set);
//void parseSubQMessage(char *message, short *in_degree, short *out_degree, int **ranks,
// short *num_results, char **results[], char **subQ);
//void parseFinalKeysMessage(char *message, short *final_num_results, char **final_results[]);
typedef struct partial_results_struct
{
int *comm_size;
int **comm_ranks;
MPI_Comm *NGBRS_COMM;
ResultInfo *tid2_arg;
char **ngbr_metadata;
short *ngbr_num_keys;
char ***ngbr_keys;
char **my_result_set;
pthread_t *ngbr_tid;
int *msg_len;
int num_results_recvd;
int my_in_degree;
short *my_num_keys;
char ***my_keys;
char **ngbr_result_set;
short *final_num_keys;
char ***final_keys;
pthread_t *subQ_tid;
} PartialResultsStruct;
void parseFinalKeysMessage(char *message, short *final_num_keys, char **final_keys[]);
void parseSubQMessage(char *message, short *in_degree, short *out_degree, int **ranks,
short *num_keys, char **keys[], char **subQ);
int createMetadataMessage(char **message, char *keys[], short num_keys);
int createResultMessage(char **message, int worker_id);
void parseMetadataMessage(char *message, short *num_keys, char **keys[]);
void parseResultMessage(char *message, char **result_set);
int hasJoinNeighbor(Node *node);
void sendMyMetadata(char *my_metadata, int len, int source, int num_ngbrs, int *ngbrs_ranks);
void sendMyResult(char *my_subQ_result, int len, int source, int out_degree, int *out_neighbors_ranks);
void createCommunicatorRanksArray(int my_rank, int *my_ngbrs_ranks, int my_num_ngbrs,
int *comm_size, int **comm_ranks);
void *receiveNgbrResult(void *arg);
void sendCreateCommunicatorRequest(int num_ngbrs, int *ngbrs_ranks, int comm_size, int *comm_ranks);
void recieveCommunicatorCreationAck(int num_ngbrs, int *ngbrs_ranks, int comm_size, int *comm_ranks);
int recvCreateCommunicatorRequest(int *num_ngbrs, int **ngbrs_ranks);
int sendCreateCommunicatorAck(int *num_ngbrs, int **ngbrs_ranks);
void createCommunicator(MPI_Comm *NGBRS_COMM, int num_ngbrs, int *ngbrs_ranks);
void recievePartialResults(int *comm_size, int **comm_ranks, MPI_Comm *NGBRS_COMM,
ResultInfo *tid2_arg, char **ngbr_metadata, short *ngbr_num_keys, char ***ngbr_keys,
char **my_result_set, pthread_t *ngbr_tid, int *msg_len, int num_results_recvd, int my_in_degree,
short *my_num_keys, char ***my_keys, char **ngbr_result_set, short *final_num_keys,
char ***final_keys, pthread_t *subQ_tid);
void *recievePartialResultsT(void *arg);
void *recievePartialResultsT2(void *arg);
#endif