-
Notifications
You must be signed in to change notification settings - Fork 0
/
ProcessorStarter.java
123 lines (106 loc) · 6.04 KB
/
ProcessorStarter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package org.middleware.project;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import org.middleware.project.Processors.StageProcessor;
import org.middleware.project.topology.*;
import org.middleware.project.utils.Pair;
import java.io.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ProcessorStarter {
/**
* mainclass server JAR
*
* @param args
*/
public static void main(String[] args) {
try {
final ExecutorService executor_stage = Executors.newFixedThreadPool(2);
InputStream inputStream = new FileInputStream(args[0]);
Properties properties = new Properties();
properties.load(inputStream);
StageProcessor function = null;
if (properties.getProperty("type").equals("source")) {
CompletableFuture.runAsync(new Source(properties), Executors.newFixedThreadPool(1));
} else if (properties.getProperty("type").equals("sink")) {
CompletableFuture.runAsync(new Sink(properties), Executors.newFixedThreadPool(1));
} else {
switch (properties.getProperty("function")) {
case "flatmap":
function = PipelineFunctions.FLATMAPPROCESSOR;
break;
case "filter":
function = PipelineFunctions.FILTERPROCESSOR;
break;
case "map":
function = PipelineFunctions.MAPPROCESSOR;
break;
case "windowaggregate":
function = PipelineFunctions.WINDOWAGGREGATEPROCESSOR;
break;
}
assert function != null;
if (properties.getProperty("type").equals("stateless")) {
StageProcessor finalFunction = function;
CompletableFuture.runAsync(new StatelessAtomicProcessor(properties,
function), executor_stage).exceptionally(throwable -> {
//here we handle restart of crashed processors
DB dbc = DBMaker.fileDB("crashedThreads.db").fileMmapEnableIfSupported().make();
System.out.println("stateless processor restart");
ConcurrentMap<Integer, Pair<Integer, String>> mapc =
dbc.hashMap("crashedThreads", Serializer.INTEGER, Serializer.JAVA).createOrOpen();
System.out.println("size of current crashedThreadmap is: " + mapc.size());
for (Map.Entry<Integer, Pair<Integer, String>> crashed : mapc.entrySet()) {
System.out.println("restarting processor\t id : " + crashed.getKey() + "\t stagePos: "
+ crashed.getValue().getKey() + " : " + crashed.getValue().getValue());
if (crashed.getValue().getValue().equals("stateless")) {
properties.setProperty("simulateCrash", String.valueOf(0));
CompletableFuture.runAsync(new StatelessAtomicProcessor(properties, finalFunction),
Executors.newFixedThreadPool(1));
mapc.remove(crashed.getKey(), crashed.getValue());
} else {
System.out.println("there is a queue of failed processes, scrolling");
}
}
dbc.close();
System.out.println("scrolled every entry of crashed threads");
return null;
});
} else if (properties.getProperty("type").equals("stateful")) {
StageProcessor finalFunction = function;
CompletableFuture.runAsync(new StatefulAtomicProcessor(properties,
function), executor_stage).exceptionally(throwable -> {
//here we handle restart of crashed processors
DB dbc = DBMaker.fileDB("crashedThreads.db").fileMmapEnableIfSupported().make();
System.out.println("stateful processor restart");
ConcurrentMap<Integer, Pair<Integer, String>> mapc =
dbc.hashMap("crashedThreads", Serializer.INTEGER, Serializer.JAVA).createOrOpen();
System.out.println("size of current crashedThreadmap is: " + mapc.size());
for (Map.Entry<Integer, Pair<Integer, String>> crashed : mapc.entrySet()) {
System.out.println("restarting processor\t id : " + crashed.getKey() + "\t stagePos: "
+ crashed.getValue().getKey() + " : " + crashed.getValue().getValue());
if (crashed.getValue().getValue().equals("stateful")) {
properties.setProperty("simulateCrash", String.valueOf(0));
CompletableFuture.runAsync(new StatefulAtomicProcessor(properties, finalFunction),
Executors.newFixedThreadPool(1));
mapc.remove(crashed.getKey(), crashed.getValue());
} else {
System.out.println("there is a queue of failed processes, scrolling");
}
}
dbc.close();
System.out.println("scrolled every entry of crashed threads");
return null;
});
} else throw new RuntimeException("processor cannot start");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}