-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdaemon.c
346 lines (301 loc) · 11.3 KB
/
daemon.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#define SELECT_EXPR "json_agg(daas_result)"
#define ERROR_EXPR "[{\"status\": \"Error\"}]"
#define MAX_PARAMETER_COUNT 9
#define ACCESSTOKEN_COOKIE "Authorization"
// AUTHORIZATION_QUERY is the key of the query to be executed for authorization
// AUTHORIZATION_QUERY gets ACCESSTOKEN_COOKIE and the requested command (query key / prepared statement name) and returns username
#define AUTHORIZATION_QUERY "~"
#if defined(ACCESSTOKEN_COOKIE) && defined(AUTHORIZATION_QUERY)
#define REQUIRE_AUTH
#endif
#define EXIT_BAD_ARG 1
#define EXIT_FAILED_DAEMONIZE 1
#define EXIT_THREAD_CREATION_FAILED 3
#define EXIT_MHD_start_daemon_FAILED 4
#define EXIT_DB_CONNECTION_FAILED 5
#define EXIT_PQ_PREPARE_FAILED 6
#include <microhttpd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <postgresql/libpq-fe.h>
struct query_list_el {
#ifdef REQUIRE_AUTH
char * accesstoken;
#endif
char * query;
char * result;
struct query_list_el * next;
};
typedef struct query_list_el query_item;
struct {
pthread_cond_t query;
pthread_cond_t result;
pthread_mutex_t lock;
query_item * head;
} query_list;
int exiting;
void sig_handler(int signum) {
exiting = 1;
pthread_cond_broadcast(&(query_list.query));
}
/**********************************************************************
*
* void* query_thread
*
**********************************************************************/
void* query_thread(void *connectionString) {
PGconn *conn = NULL;
conn = PQconnectdb(connectionString);
if (PQstatus(conn) != CONNECTION_OK) {
printf("Connection to database failed!\n");
PQfinish(conn);
exit(EXIT_DB_CONNECTION_FAILED);
}
/*
* Creating Prepared Statements
*/
PGresult *res = NULL;
res = PQexec(conn, "SELECT key, 'WITH daas_results AS ( ' || query || "
" CASE WHEN left(upper(ltrim(query)), 6) = 'SELECT'"
" THEN ' ' "
" ELSE ' RETURNING * ' "
" END"
" || ') SELECT " SELECT_EXPR
" FROM daas_results AS daas_result' FROM daas"
";");
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
printf("Could not get list of queries\n\t%s\n", PQresultErrorMessage(res) );
PQclear(res);
PQfinish(conn);
exit(EXIT_PQ_PREPARE_FAILED);
}
int queryNo;
PGresult *prepareRes = NULL;
for(queryNo=0; queryNo<PQntuples(res); queryNo++) {
prepareRes = PQprepare(conn, PQgetvalue(res, queryNo, 0), PQgetvalue(res, queryNo, 1), 1, NULL); // const Oid *paramTypes);
if (PQresultStatus(prepareRes) != PGRES_COMMAND_OK) {
PQfinish(conn);
exit(EXIT_PQ_PREPARE_FAILED);
}
}
PQclear(prepareRes);
PQclear(res);
/*
* Processing queries
*/
const char *paramValues[MAX_PARAMETER_COUNT];
#ifdef REQUIRE_AUTH
const char *authParamValues[2];
#endif
while(!exiting) {
pthread_mutex_lock(&(query_list.lock));
while (!query_list.head) {
if(exiting) break;
pthread_cond_wait(&(query_list.query), &(query_list.lock));
}
if(exiting) break;
// Get the query in the head, advance the list and release the lock
// FIXME If somethng goes wrong, are we going to loose this query?
query_item * head = query_list.head;
query_list.head = query_list.head->next;
pthread_mutex_unlock(&(query_list.lock));
// Do the SQL query
// head->result might me pointing to strdup of the result, so malloc may or may not be needed
// head->result = malloc(SQL_RESULT_BUF_SIZE+1);
// Assuming query type (preparestatement name) is one character
// /a/Parameter0
// 0123
// Moved the following after finding parameters
// head->query[2]=0;
paramValues[0] = &(head->query[3]);
/***************************************
* REQUIRE_AUTH
***************************************/
#ifdef REQUIRE_AUTH
const char *authParamValues[2];
authParamValues[0] = head->accesstoken;
authParamValues[1] = &(head->query[1]);
res = PQexecPrepared(conn, // PGconn *conn,
AUTHORIZATION_QUERY, // const char *stmtName,
2, // int nParams,
authParamValues, // const char * const *paramValues,
NULL, // const int *paramLengths,
NULL, // const int *paramFormats, /* all parameters are presumed to be text strings. */
0); // int resultFormat);
int authorized = 0;
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)==1) {
authorized = strlen(PQgetvalue(res, 0, 0));
}
PQclear(res);
if (!authorized) {
head->result = strdup("[{\"status\": \"Not authorized\"}]");
// Result is ready (actually, failure is ready)
// Detach the head query_item: Already done
// query_list.head = query_list.head->next;
pthread_cond_broadcast(&(query_list.result));
// pthread_mutex_unlock(&(query_list.lock));
continue;
}
#endif
/***************************************
* Actual Queries
***************************************/
int nParams = 0;
char *lastPtr = &(head->query[2]); //head->query+3;
while(nParams < MAX_PARAMETER_COUNT && (lastPtr = strchr( lastPtr, '/' )) != NULL ) {
*lastPtr = 0; // NULL terminate previous part
paramValues[nParams++] = ++lastPtr;
}
head->query[2]=0;
res = PQexecPrepared(conn, // PGconn *conn,
&(head->query[1]), // const char *stmtName,
nParams, // int nParams,
paramValues, // const char * const *paramValues,
NULL, // const int *paramLengths,
NULL, // const int *paramFormats, /* all parameters are presumed to be text strings. */
0); // int resultFormat);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
head->result = strdup(ERROR_EXPR);
} else if(PQntuples(res)<1) {
head->result = strdup("{}");
} else {
head->result = strdup(PQgetvalue(res, 0, 0));
}
PQclear(res);
// Result is ready
// Detach the head query_item: Already done
// query_list.head = query_list.head->next;
pthread_cond_broadcast(&(query_list.result));
// pthread_mutex_unlock(&(query_list.lock));
}
PQfinish(conn);
pthread_exit(NULL);
}
/**********************************************************************
*
* int con_handler
*
**********************************************************************/
static int con_handler(void * cls,
struct MHD_Connection * connection,
const char * url,
const char * method,
const char * version,
const char * upload_data,
size_t * upload_data_size,
void ** ptr) {
static int dummy;
struct MHD_Response * response;
int ret;
// OPTIMIZATION FANATIC!
// if (0 != strcmp(method, "GET")) return MHD_NO; /* unexpected method */
if (*method != 'G' || *(method+1) != 'E' || *(method+2) != 'T' || *(method+3) != 0) return MHD_NO; /* unexpected method */
if (&dummy != *ptr) {
/* The first time only the headers are valid,
* do not respond in the first round... */
/* http://www.gnu.org/software/libmicrohttpd/manual/libmicrohttpd.html#microhttpd_002dpost */
*ptr = &dummy;
return MHD_YES;
}
if (0 != *upload_data_size) return MHD_NO; /* upload data in a GET!? */
#ifdef REQUIRE_AUTH
const char * accesstoken = MHD_lookup_connection_value(connection, MHD_COOKIE_KIND, ACCESSTOKEN_COOKIE);
if (!accesstoken) {
return MHD_NO;
}
#endif
pthread_mutex_lock(&(query_list.lock));
query_item * new_query = malloc(sizeof(query_item));
#ifdef REQUIRE_AUTH
new_query->accesstoken = strdup(accesstoken);
#endif
new_query->query = strdup(url);
new_query->result = NULL;
new_query->next = NULL;
if(query_list.head) {
query_item * last_query = query_list.head;
while (last_query->next) {
last_query = last_query->next;
}
last_query->next = new_query;
} else {
query_list.head = new_query;
}
pthread_cond_signal(&(query_list.query));
// Waiting for the result
while (!new_query->result) {
pthread_cond_wait(&(query_list.result), &(query_list.lock));
}
pthread_mutex_unlock(&(query_list.lock)); // We do not need the lock anymore
*ptr = NULL; /* clear context pointer */
response = MHD_create_response_from_buffer(strlen(new_query->result),
new_query->result,
MHD_RESPMEM_MUST_FREE);
free(new_query->accesstoken);
free(new_query->query);
/* free(new_query->result); // Passed it as the response creation function with MHD_RESPMEM_MUST_FREE Attr */
free(new_query);
/* Disable caching */
MHD_add_response_header(response, "cache-control", "private, max-age=0, no-cache");
ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
MHD_destroy_response(response);
return ret;
}
/**********************************************************************
*
* int main
*
**********************************************************************/
int main(int argc, char ** argv) {
struct MHD_Daemon * d;
if (argc != 3) {
printf("Usage:\n\t%s PORT CONNECTION_STRING\n", argv[0]);
return EXIT_BAD_ARG;
}
char connectionString[512];
snprintf(connectionString, 511, "%s", argv[2]);
if (strlen(connectionString)>510) {
printf("Connection String too long:\n\t%s\n", argv[2]);
return EXIT_BAD_ARG;
}
exiting = 0;
signal(SIGINT, sig_handler);
if (close(0)) {
return EXIT_FAILED_DAEMONIZE;
}
/* unbuffered STD OUT */
setvbuf(stdout, NULL, _IONBF, 0);
/* line buffered
* setlinebuf(stdout)
*/
pthread_mutex_init(&(query_list.lock), NULL);
query_list.head = NULL;
pthread_t tid;
int err = pthread_create(&tid, NULL, &query_thread, connectionString);
if (err != 0) {
printf("\ncan't create thread: [%s]", strerror(err));
// Maybe we can use a cleanup function
pthread_mutex_destroy(&(query_list.lock));
return EXIT_THREAD_CREATION_FAILED;
}
d = MHD_start_daemon(MHD_USE_THREAD_PER_CONNECTION,
atoi(argv[1]),
NULL,
NULL,
&con_handler,
NULL,
MHD_OPTION_END);
if (d == NULL) return EXIT_MHD_start_daemon_FAILED;
// Inform Query execution thread
// exiting = 1;
/* Wait for thread(s)
*/
pthread_join(tid, NULL);
pthread_cond_broadcast(&(query_list.query));
pthread_mutex_destroy(&(query_list.lock));
MHD_stop_daemon(d);
return 0;
}