-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflame.mjs
210 lines (178 loc) · 6.71 KB
/
flame.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import axios from 'axios'
import http from 'node:http'
import * as url from 'url';
const { IS_RUNNER, FLY_API_TOKEN, FLY_APP_NAME, FLY_IMAGE_REF, IS_LOCAL_DEV } = process.env
const port = 5500
const timeUntilStop = 5 * 60 * 1000
let exitTimeout
/*
Process groups are ways to group Fly Machines by their start commands.
Since our "runner" Machines will use a different start command than the rest of the app,
we can set the process group here to target only the "runner" Machines.
*/
let processGroup;
if (FLY_IMAGE_REF.includes(':deployment-')) {
const deploymentId = FLY_IMAGE_REF.split(':deployment-').pop().toLocaleLowerCase()
processGroup = `runner-${deploymentId}`
} else {
processGroup = `runner-${new Buffer(FLY_IMAGE_REF).toString('base64').toLocaleLowerCase()}`
}
/*
Start a new axios instance for Fly.io's Machines API
We'll use this to spawn new machines and check if there are any available runners
*/
const machinesService = axios.create({
baseURL: `https://api.machines.dev/v1/apps/${FLY_APP_NAME}`,
headers: { 'Authorization': `Bearer ${FLY_API_TOKEN}` }
})
/*
Our `runnerBaseUrl` will be different depending on if we're running locally or on Fly.io
When running on Fly.io, we'll use the internal DNS name of the "runner" Machines;
This is allows us to access the "runner" Machines without exposing them to the public internet.
*/
let runnerBaseUrl;
if (IS_LOCAL_DEV) {
runnerBaseUrl = `http://localhost:${port}`
} else {
runnerBaseUrl = `http://${processGroup}.process.${FLY_APP_NAME}.internal:${port}`
}
/*
Start a new axios instance for accessing your "runner" Machines
*/
const runnerService = axios.create({ baseURL: runnerBaseUrl })
/*
`IS_RUNNER` is an environment variable that we'll set on our "runner" Machines
If it's set, we'll start a new HTTP server that will listen for requests to execute code
*/
if (IS_RUNNER) {
const requestHandler = (request, response) => {
scheduleStop()
console.info(`Received ${request.method} request`)
var body = "";
// This simple GET request is used to check if the "runner" Machine is available
if (request.method === 'GET') {
response.writeHead(200, { 'Content-Type': 'application/json' });
response.write(JSON.stringify({ok: true}));
response.end();
return
}
request.on('readable', function() {
let chunk
if (chunk = request.read()) {
body += chunk
}
});
/*
When the request ends, we'll parse the JSON body to get the
filename and arguments for the file containing our flame-wrapped function.
*/
request.on('end', async function run() {
const { filename, args } = JSON.parse(body)
const mod = await import(filename)
const result = await mod.default(...args)
const jsonResponse = JSON.stringify({___result: result})
response.writeHead(200, { 'Content-Type': 'application/json' });
response.write(jsonResponse);
response.end();
});
}
// Start the HTTP server for our "runner" Machine
const server = http.createServer(requestHandler)
server.listen(port, (err) => {
if (err) {
return console.log('something bad happened', err)
}
console.log(`Server is listening on ${port}`)
scheduleStop()
})
}
/*
Our `flame` function is a wrapper around the original function that we want to run on another machine.
It returns one of two functions:
1. If the current process is not a "runner" Machine, it boots up a new machine
2. If the current process IS on a "runner" Machine, it returns the original function
Important to note that this function always gets called TWICE: once on our original machine, and once on the "runner" machine.
*/
export default function flame(originalFunc, config) {
// If we're running on a "runner" machine, we'll just return the original function
if (IS_RUNNER) {
return originalFunc
}
// Get the filename of the file containing the original function
const filename = url.fileURLToPath(config.path);
// If we're NOT on a "runner" machine, we'll return a new function that will spawn a new machine and then execute the original function
return async function (...args) {
if (!(await checkIfThereAreRunners())) {
await spawnAnotherMachine(config.guest)
}
return await execOnMachine(filename, args)
}
}
/*
This spins up a new "runner" machine. Here we're using Fly.io's Machines API to start a new machine with the same image as the current machine.
If you aren't deploying a Fly app, you would simply replace this API call with the appropriate API call for your cloud provider.
We're also setting the `IS_RUNNER` environment variable to `1` so that the new machine knows it's a "runner" machine.
We're also setting the `processGroup` metadata so that we can target only the "runner" machines.
*/
async function spawnAnotherMachine(guest) {
// This file contains the code to start our "runner" HTTP server,
// so this is what we'll use as the entrypoint
const flameLibraryPath = url.fileURLToPath(import.meta.url);
// Start a new machine with the same image as the current machine
const {data: machine} = await machinesService.post('/machines', {
config: {
auto_destroy: true,
image: FLY_IMAGE_REF,
guest,
env: {
IS_RUNNER: "1"
},
processes: [
{
name: "runner",
entrypoint: ['node'],
cmd: [flameLibraryPath]
}
],
metadata: {
fly_process_group: processGroup
}
}
})
// Set a timeout so our new machine doesn't run forever
await machinesService.get(`/machines/${machine.id}/wait?timeout=60&state=started`)
return machine
}
/*
This checks if there are any "runner" machines available to run our code, using a simple GET request.
*/
async function checkIfThereAreRunners() {
try {
const res = await runnerService.get('/')
return res.status === 200 && res.data.ok
} catch (err) {
return false
}
}
/*
This function sends a POST request to the "runner" machine, which will result in our original function being executed.
It sends the filename and arguments for the file containing our flame-wrapped function.
It then returns the result of the execution.
*/
async function execOnMachine(filename, args) {
const jsonArgs = JSON.stringify(args)
const execRes = await runnerService.post('/', {
filename,
jsonArgs
})
// Return the result of our original function to the main app Machine
return execRes.data.___result
}
// This function schedules the "runner" Machine to stop after a certain amount of time
function scheduleStop() {
clearInterval(exitTimeout)
exitTimeout = setTimeout(() => {
process.exit(0)
}, timeUntilStop)
console.info(`Server will stop in ${timeUntilStop}ms`)
}