-
Notifications
You must be signed in to change notification settings - Fork 3
/
Scheduling.ml
486 lines (429 loc) · 13.5 KB
/
Scheduling.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
(* This file is free software, part of nunchaku. See file "license" for more details. *)
(** {1 Scheduling of sub-processes} *)
module E = CCResult
(*$inject
module E = CCResult
*)
type 'a or_error = ('a, exn) CCResult.t
let section = Utils.Section.make "scheduling"
module MVar = struct
type 'a t = {
mutable v: 'a;
lock: Mutex.t;
}
let make v = {v; lock=Mutex.create(); }
let with_ v ~f =
Mutex.lock v.lock;
CCFun.finally ~f ~h:(fun () -> Mutex.unlock v.lock)
let get v = with_ v ~f:(fun () -> v.v)
let set v x = with_ v ~f:(fun () -> v.v <- x)
let update ~f v =
with_ v
~f:(fun () ->
let x, res = f v.v in
v.v <- x;
res)
end
module Fut = struct
type 'a final_state =
| Stopped
| Done of 'a
| Fail of exn
(* bag of tasks to execute one after the other *)
type tasks_bag = (unit -> unit) Queue.t
type 'a on_res_callback = tasks_bag -> 'a final_state -> unit
type 'a t = {
lock: Mutex.t;
cond: Condition.t;
mutable on_res: (tasks_bag -> 'a final_state -> unit) list;
mutable state : 'a final_state option; (* none=running *)
}
let with_ t ~f =
Mutex.lock t.lock;
CCFun.finally ~h:(fun () -> Mutex.unlock t.lock) ~f
(* fixpoint: do all the tasks in [q], until [q] is empty *)
let run_queue_ q =
while not (Queue.is_empty q) do
try Queue.pop q ()
with _ -> ()
done
let apply_in0_ q f = Queue.push f q
let apply_in1_ q f x = Queue.push (fun () -> f x) q
let apply_in2_ q f x y = Queue.push (fun () -> f x y) q
let set_res ~idempotent t res =
(* tasks to do without a lock *)
let q = Queue.create() in
with_ t
~f:(fun () -> match t.state with
| Some Stopped -> ()
| Some _ ->
if not idempotent then failwith "future already done"
| None ->
t.state <- Some res;
Condition.broadcast t.cond;
(* apply each callback, but in [q], outside of the lock *)
List.iter (fun f -> apply_in2_ q f q res) t.on_res;
);
(* now execute the tasks *)
run_queue_ q;
()
let set_done t x = set_res ~idempotent:false t (Done x)
let set_fail t e = set_res ~idempotent:false t (Fail e)
let stop t = set_res ~idempotent:true t Stopped
let get t =
with_ t
~f:(fun () -> match t.state with
| Some s -> s
| None ->
Condition.wait t.cond t.lock;
match t.state with
| None -> assert false
| Some s -> s)
let is_done t = with_ t ~f:(fun () -> t.state <> None)
let on_res t ~f =
let q = Queue.create() in
with_ t
~f:(fun () -> match t.state with
| None -> t.on_res <- f :: t.on_res
| Some res -> apply_in2_ q f q res);
run_queue_ q;
()
let return x = {
on_res=[];
lock=Mutex.create();
cond=Condition.create();
state=Some (Done x);
}
let map f x =
let y = {
on_res=[];
lock=Mutex.create();
cond=Condition.create();
state=None;
} in
on_res x
~f:(fun q res -> match res with
| Stopped -> apply_in1_ q stop y
| Done x ->
apply_in0_ q
(fun () ->
try set_done y (f x)
with e -> set_fail y e)
| Fail e -> apply_in2_ q set_fail y e);
on_res y
~f:(fun q res -> match res with
| Stopped -> apply_in1_ q stop x (* propagate "stop" *)
| _ -> ());
y
type 'a partial_result =
| P_done of 'a
| P_fail of exn
let make ?(on_res=[]) f =
let fut = {
on_res;
lock=Mutex.create();
cond=Condition.create();
state=None;
} in
(* task to run in a thread *)
let compute_and_set () =
let res =
try P_done (f ())
with e -> P_fail e
in
match res with
| P_done x -> set_done fut x
| P_fail e -> set_fail fut e
in
(* spawn thread to run the job *)
let _ = Thread.create compute_and_set () in
fut
end
(*$inject
let pp_fut_int = function
| Fut.Done x -> "done " ^ string_of_int x
| Fut.Stopped -> "stopped"
| Fut.Fail e -> "failed " ^ Printexc.to_string e
*)
(*$= & ~printer:pp_fut_int
(Fut.Done 42) ( \
let t = Fut.make (fun () -> Thread.delay 0.2; 42) in \
Fut.get t)
*)
(*$= & ~printer:pp_fut_int
Fut.Stopped ( \
let t = Fut.make (fun () -> Thread.delay 0.2; 42) in \
Fut.stop t; \
Fut.get t)
*)
(*$= & ~printer:pp_fut_int
(Fut.Fail Exit) ( \
let t = Fut.make (fun () -> Thread.delay 0.2; raise Exit) in \
Fut.get t)
*)
(*$= & ~printer:pp_fut_int
(Fut.Fail Exit) ( \
let t = Fut.make (fun () -> 0) |> Fut.map (fun _ -> raise Exit) in \
Fut.get t)
*)
(*$=
(Fut.Done ()) ( \
let t = Fut.make (fun () -> 0) |> Fut.map (fun _ -> ()) in \
Fut.on_res t ~f:(fun _ -> raise Exit); \
Fut.get t)
*)
type process_status = int
(* make sure that we are a session leader; that is, our children die if we die *)
let ensure_session_leader =
let thunk = lazy (
if not Sys.win32 && not Sys.cygwin
then try ignore (Unix.setsid ()) with _ -> ()
) in
fun () -> Lazy.force thunk
(* create a new active process by running [cmd] and applying [f] on it *)
let popen ?(on_res=[]) cmd ~f =
Utils.debugf ~lock:true ~section 3
"@[<2>start sub-process@ `@[%s@]`@]" (fun k->k cmd);
ensure_session_leader ();
if Sys.unix then ignore (Unix.sigprocmask Unix.SIG_BLOCK [13]); (* block sigpipe *)
(* spawn subprocess *)
let stdout, p_stdout = Unix.pipe () in
let stderr, p_stderr = Unix.pipe () in
let p_stdin, stdin = Unix.pipe () in
Unix.set_close_on_exec stdout;
Unix.set_close_on_exec stderr;
Unix.set_close_on_exec stdin;
let stdout = Unix.in_channel_of_descr stdout in
let stdin = Unix.out_channel_of_descr stdin in
let pid = Unix.create_process
"/bin/sh" [| "/bin/sh"; "-c"; cmd |] p_stdin p_stdout p_stderr in
Unix.close p_stdout;
Unix.close p_stdin;
Unix.close p_stderr;
Utils.debugf ~lock:true ~section 3
"@[<2>pid %d -->@ sub-process `@[%s@]`@]" (fun k -> k pid cmd);
(* cleanup process *)
let cleanup q _ =
Fut.apply_in0_ q
(fun () ->
Utils.debugf ~lock:true ~section 3 "cleanup subprocess %d" (fun k->k pid);
(try Unix.kill pid 15 with _ -> ());
close_out_noerr stdin;
close_in_noerr stdout;
(try Unix.close stderr with _ -> ());
(try Unix.kill pid 9 with _ -> ()); (* just to be sure *)
Utils.debugf ~lock:true ~section 3 "subprocess %d cleaned" (fun k->k pid);
)
in
Fut.make ~on_res:(cleanup :: on_res)
(fun () ->
try
let x = f (stdin, stdout) in
let _, res = Unix.waitpid [Unix.WUNTRACED] pid in
let res = match res with
| Unix.WEXITED i | Unix.WSTOPPED i | Unix.WSIGNALED i -> i
in
Utils.debugf ~lock:true ~section 3
"@[<2>sub-process %d done;@ command was `@[%s@]`@]"
(fun k->k pid cmd);
E.return (x,res)
with e ->
E.fail e
)
(*$T
(try ignore (popen "ls /tmp" ~f:(fun _ -> ()) |> Fut.get); true with _ -> false)
*)
(*$=
(Fut.Done (E.Ok ("coucou\n", 0))) \
(popen "echo coucou" ~f:(fun (_,oc) -> CCIO.read_all oc) |> Fut.get)
*)
type shortcut =
| Shortcut
| No_shortcut
module Task = struct
type ('a, 'res) inner = {
prio: int; (* priority. The lower, the more urgent *)
slice: float; (* fraction of time allotted to the task *)
f: (deadline:float -> unit -> ('a * shortcut) Fut.t);
post: ('a -> 'res);
}
(** Task consisting in running [f arg], obtaining a result. After [f]
returns, [post] is applied to the result to obtain a value
of type ['res] *)
type 'res t =
| Task : ('a, 'res) inner -> 'res t
let of_fut ?(prio=50) ?(slice=1.) f =
Task { prio; f; slice; post=CCFun.id; }
let return ?(prio=100) x short =
let f ~deadline:_ () = Fut.return (x, short) in
of_fut ~prio ~slice:0.001 f
let make ?prio ?slice f =
of_fut ?prio ?slice
(fun ~deadline () -> Fut.make (fun () -> f ~deadline ()))
let compare_prio (Task t1) (Task t2) = Stdlib.compare t1.prio t2.prio
let map ~f (Task t_i) =
Task {t_i with post=(fun x -> f (t_i.post x)); }
end
(* task currently running *)
type 'res running_task =
| R_task :
int * (* unique ID *)
('a, 'res) Task.inner *
('a * shortcut) Fut.t ->
'res running_task
type 'a run_result =
| Res_one of 'a
| Res_list of 'a list
| Res_fail of exn
type 'res pool = {
j: int;
deadline: float;
total_alloted_time: float; (* total time, from creation of pool to deadline *)
mutable task_id: int;
mutable todo: 'res Task.t list;
mutable active : 'res running_task list;
mutable pool_state : 'res run_result;
lock: Mutex.t;
cond: Condition.t;
}
let kill_rtask_ (R_task (_,_,fut)) = Fut.stop fut
let with_ p ~f =
Mutex.lock p.lock;
CCFun.finally ~h:(fun () -> Mutex.unlock p.lock) ~f
(* remove running task by its ID *)
let rec remove_rtask_ id l = match l with
| [] -> []
| R_task (id', _, _) :: tl when id=id' -> tl
| t :: tl -> t :: remove_rtask_ id tl
(* return a running task.
precondition: p is locked
postcondition: new task is started, added to p, and p is unlocked *)
let start_task p t =
let id = p.task_id in
p.task_id <- id + 1;
let (Task.Task t_inner) = t in
(* adjust deadline according to task.slice: take the minimum of
the global deadline, and *)
let deadline =
let now = Unix.gettimeofday() in
let deadline' = now +. (p.total_alloted_time *. t_inner.Task.slice) in
min p.deadline deadline'
in
Utils.debugf ~section 5 "@[<2>actual deadline is %.2f (slice: %.2f)@]"
(fun k->k deadline t_inner.Task.slice);
let fut = t_inner.Task.f ~deadline () in
let r_task = R_task (id, t_inner, fut) in
p.active <- r_task :: p.active;
Mutex.unlock p.lock;
(* ensure that after completion, the task is removed and the pool's
state is updated *)
Fut.on_res fut
~f:(fun _ res ->
with_ p
~f:(fun () ->
p.active <- remove_rtask_ id p.active;
begin match res, p.pool_state with
| Fut.Done (x, No_shortcut), Res_list l ->
begin
try
let y = t_inner.Task.post x in
p.pool_state <- Res_list (y::l)
with e ->
p.pool_state <- Res_fail e
end
| Fut.Done (x, Shortcut), Res_list _ ->
begin
try
let y = t_inner.Task.post x in
p.pool_state <- Res_one y
with e ->
p.pool_state <- Res_fail e
end
| Fut.Fail e, Res_list _ ->
p.pool_state <- Res_fail e;
| Fut.Stopped, _
| _, (Res_one _ | Res_fail _) -> ()
end;
(* awake the main thread, if required *)
Condition.broadcast p.cond)
);
()
(* main function for running threads *)
let rec run_pool pool =
Mutex.lock pool.lock;
match pool.todo, pool.active, pool.pool_state with
| _, _, (Res_one _ | Res_fail _) ->
(* return now *)
Utils.debug ~lock:true ~section 2 "stop active tasks...";
Mutex.unlock pool.lock;
List.iter kill_rtask_ pool.active;
pool.pool_state
| [], [], _ ->
Utils.debug ~lock:true ~section 2 "all tasks done";
Mutex.unlock pool.lock;
pool.pool_state
| task :: todo_tl, _, Res_list _ ->
if List.length pool.active < pool.j
then (
Utils.debugf ~lock:true ~section 2
"start new task (active: %d / j=%d / todo: %d)"
(fun k->k (List.length pool.active) pool.j (List.length todo_tl));
(* run new task *)
pool.todo <- todo_tl;
start_task pool task; (* releases lock *)
) else (
(* wait for something to happen *)
Utils.debugf ~lock:true ~section 2
"waiting (max number of active tasks / todo: %d)..."
(fun k->k (1+List.length todo_tl));
Condition.wait pool.cond pool.lock;
Mutex.unlock pool.lock;
);
run_pool pool
| [], _::_, Res_list _ ->
(* wait for something to happen *)
Utils.debug ~lock:true ~section 2 "waiting for tasks to finish...";
Condition.wait pool.cond pool.lock;
Mutex.unlock pool.lock;
(* check again *)
run_pool pool
let run ~j ~deadline tasks =
if j < 1 then invalid_arg "Scheduling.run";
Utils.debugf ~lock:true ~section 1
"@[<2>%d tasks to run (j=%d)...@]" (fun k->k (List.length tasks) j);
let pool = {
todo=List.sort Task.compare_prio tasks; (* sort by increasing 'prio' *)
active=[];
pool_state = Res_list [];
task_id=0;
deadline;
total_alloted_time=(deadline -. Unix.gettimeofday());
j;
lock=Mutex.create();
cond=Condition.create();
} in
run_pool pool
(*$=
(Res_one 5) ( \
let mk i = Task.make \
(fun ~deadline:_ () -> if i=5 then i, Shortcut else i, No_shortcut) in \
run ~j:3 ~deadline:5. CCList.(1 -- 10 |> map mk))
*)
(*$=
(Res_one 5) ( \
let mk i = Task.make \
(fun ~deadline:_ () -> Thread.delay (float i *. 0.1); \
if i=5 then i, Shortcut else i, No_shortcut) in \
run ~j:3 ~deadline:5. CCList.(1 -- 10 |> map mk))
*)
(*$=
(Res_fail Exit) ( \
let mk i = Task.make (fun ~deadline:_ () -> if i=5 then raise Exit else i, No_shortcut) in \
run ~j:3 ~deadline:5. CCList.(1 -- 10 |> map mk))
*)
(*$=
(Res_list CCList.(1--10)) ( \
let mk i = Task.make (fun ~deadline:_ () -> i, No_shortcut) in \
let res = run ~j:3 ~deadline:5. CCList.(1 -- 10 |> map mk) in \
match res with Res_list l -> Res_list (List.sort Stdlib.compare l) | x->x)
*)