-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathsend-request.js
146 lines (136 loc) · 3.66 KB
/
send-request.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import http from "http";
import { uuid } from "mu";
const DEFAULT_RETRY_TIMEOUT = 250;
function formatChangesetBody(changeSets, options) {
if (options.resourceFormat == "v0.0.1") {
return JSON.stringify(
changeSets.map((change) => {
return {
inserts: change.insert,
deletes: change.delete,
};
})
);
}
if (options.resourceFormat == "v0.0.0-genesis") {
// [{delta: {inserts, deletes}]
const newOptions = Object.assign({}, options, { resourceFormat: "v0.0.1" });
const newFormat = JSON.parse(formatChangesetBody(changeSets, newOptions));
return JSON.stringify({
// graph: Not available
delta: {
inserts: newFormat
.flatMap(({ inserts }) => inserts)
.map(({ subject, predicate, object }) => ({
s: subject.value,
p: predicate.value,
o: object.value,
})),
deletes: newFormat
.flatMap(({ deletes }) => deletes)
.map(({ subject, predicate, object }) => ({
s: subject.value,
p: predicate.value,
o: object.value,
})),
},
});
} else {
throw `Unknown resource format ${options.resourceFormat}`;
}
}
const handleResponse = async (
response,
entry,
changeSets,
muCallIdTrail,
muSessionId,
extraHeaders,
retriesLeft
) => {
if (response.ok) {
// currently no need to act on a successful response
return;
}
const { method, url } = entry.callback;
console.log(
`Call to ${method} ${url} likely failed. Received status ${response.status}.`
);
if (retriesLeft === 0 || (response.status && response.status < 500)) {
console.log(`NOT RETRYING`);
return;
}
retriesLeft = retriesLeft - 1;
console.log(`RETRYING (${retriesLeft} left)`);
// in this case we want to retry, our error handling will deal with this.
await new Promise((resolve) =>
setTimeout(resolve, entry.retryTimeout || DEFAULT_RETRY_TIMEOUT)
);
await sendRequest(
entry,
changeSets,
muCallIdTrail,
muSessionId,
extraHeaders,
retriesLeft
);
};
export async function sendRequest(
entry,
changeSets,
muCallIdTrail,
muSessionId,
extraHeaders = {},
retriesLeft = undefined
) {
if (changeSets.length) {
if (retriesLeft === undefined) {
retriesLeft = entry.options?.retry || 0;
}
// construct the requestObject
const method = entry.callback.method;
const url = entry.callback.url;
const headers = {
...extraHeaders,
"Content-Type": "application/json",
"mu-call-id-trail": muCallIdTrail,
"mu-call-id": uuid(),
"mu-session-id": muSessionId,
};
if (changeSets[0].allowedGroups) {
headers["MU-AUTH-ALLOWED-GROUPS"] = changeSets[0].allowedGroups;
}
let body;
if (entry.options && entry.options.resourceFormat) {
// we should send contents
body = formatChangesetBody(changeSets, entry.options);
}
if (process.env["DEBUG_DELTA_SEND"])
console.log(`Executing send ${method} to ${url}`);
try {
const keepAliveAgent = new http.Agent({
keepAlive: true,
});
const response = await fetch(url, {
method,
headers,
body,
agent: keepAliveAgent,
});
await handleResponse(
response,
entry,
changeSets,
muCallIdTrail,
muSessionId,
extraHeaders,
retriesLeft
);
} catch (error) {
console.log(error);
}
} else {
if (process.env["DEBUG_DELTA_SEND"] || process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"])
console.log(`Changeset empty. Not sending to ${entry.callback.method} ${entry.callback.url}`);
}
}