-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTopologyRunner.java
211 lines (180 loc) · 8.76 KB
/
TopologyRunner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package uniko.west.topology;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.rabbitmq.client.ConnectionFactory;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;
import io.latent.storm.rabbitmq.Declarator;
import io.latent.storm.rabbitmq.config.ConnectionConfig;
import io.latent.storm.rabbitmq.config.ConsumerConfig;
import io.latent.storm.rabbitmq.config.ConsumerConfigBuilder;
import uniko.west.topology.bolts.DiscussionTreeBolt;
import uniko.west.topology.bolts.RoleAnalysisBolt;
import util.ExampleSocialMediaAMQPSpout;
import util.ExampleSocialMediaStormDeclarator;
import util.JacksonScheme;
public class TopologyRunner {
public static final String topologyFolderName = "roleAnalysisTopology";
public static String topologyName;
public static void main(String[] args) {
TopologyBuilder builder;
// Storm Spouts
IRichSpout stormExampleSocialMediaAMQPSpout;
SpoutDeclarer spoutDeclarer;
// Storm RabbitMQ queue declarator
Declarator declarator;
String spoutId = "rabbitmqSpout";
// Main Storm Social Media Properties file
File configFile = new File(args[0]);
File pServerConfigFile = new File(args[1]);
// URL of restlet service
String restletURL = args[2];
String rmqExchange = args[3];
String nimbusHost = args[4];
TopologyRunner.topologyName = rmqExchange + "_" + topologyFolderName;
// Create Java properties file from the passed configuration file
Properties properties = new Properties();
Properties pServerConfig = new Properties();
try {
properties.load(new FileInputStream(configFile));
pServerConfig.load(new FileInputStream(pServerConfigFile));
} catch (IOException ex) {
Logger.getLogger(TopologyRunner.class.getName()).log(Level.SEVERE, null, ex);
}
// Get all the needed RabbitMQ connection properties from the
// configuration file
String rmqHost = properties.getProperty("rmqhost", "localhost");
int rmqPort = Integer.parseInt(properties.getProperty("rmqport", "5672"));
String rmqUsername = properties.getProperty("rmqusername", "guest");
String rmqPassword = properties.getProperty("rmqpassword");
int rmqHeartBeat = Integer.parseInt(properties.getProperty("rmqheartbeat", "10"));
String rmqQueueName = properties.getProperty("rmqqueuename", "test");
String rmqExchangeType = properties.getProperty("rmqexchangetype", "topic");
String rmqRouting = properties.getProperty("rmqrouting", "test-routing");
final boolean rmqPersistence = Boolean.parseBoolean(properties.getProperty("persistence", "false"));
String emitFieldsId = properties.getProperty("emit_fields_id", "object");
// Get Storm Topology configuration parameters
boolean topologyDebug = Boolean.valueOf(properties.getProperty("topology_debug", "false"));
// Get Storm Spout configuration parameters
boolean spoutDebug = Boolean.valueOf(properties.getProperty("spout_debug", "false"));
int rmqPrefetch = Integer.parseInt(properties.getProperty("spout_rmqprefetch", "200"));
int maxSpoutPending = Integer.parseInt(properties.getProperty("spout_max_spout_pending", "200"));
String pServerHostName = pServerConfig.getProperty("hostName");
String pServerMode = pServerConfig.getProperty("mode");
String pServerClientName = pServerConfig.getProperty("clientName");
String pServerClientPasswd = pServerConfig.getProperty("clientPasswd");
boolean pServerInitData = Boolean.valueOf(pServerConfig.getProperty("initServerData"));
JacksonScheme jsonScheme = new JacksonScheme();
/*
* Create RabbitMQ connection configuration Documentation (no API, just
* an example of usage):
* https://github.com/ppat/storm-rabbitmq/blob/master/README.md (search
* for "RabbitMQ Spout")
*/
ConnectionConfig connectionConfig = new ConnectionConfig(rmqHost, rmqPort, rmqUsername, rmqPassword,
ConnectionFactory.DEFAULT_VHOST, rmqHeartBeat);
Logger.getLogger(TopologyRunner.class.getName()).log(Level.INFO,
"Initialised RabbitMQ connection configuration object.");
/*
* Create Storm Spout configuration builder Documentation (no API, just
* an example of usage):
* https://github.com/ppat/storm-rabbitmq/blob/master/README.md (search
* for "RabbitMQ Spout")
*/
// Customer configuration builder
ConsumerConfigBuilder spoutConfigBuilder = new ConsumerConfigBuilder();
spoutConfigBuilder.connection(connectionConfig);
spoutConfigBuilder.queue(rmqQueueName);
spoutConfigBuilder.prefetch(rmqPrefetch);
spoutConfigBuilder.requeueOnFail();
Logger.getLogger(TopologyRunner.class.getName()).log(Level.INFO, "Initialised Spout configuration builder.");
/*
* Build Storm spout configuration Documentation (no API, just an
* example of usage):
* https://github.com/ppat/storm-rabbitmq/blob/master/README.md (search
* for "RabbitMQ Spout")
*/
ConsumerConfig spoutConfig = spoutConfigBuilder.build();
Logger.getLogger(TopologyRunner.class.getName()).log(Level.INFO, "Initialised Spout configuration builder.");
/*
* Create a AMQP Declarator (will declare queue if it does not exist on
* the time of the Storm launch) Documentation (no API, just an example
* of usage):
* https://github.com/ppat/storm-rabbitmq/blob/master/README.md (search
* for "Declarator")
*/
declarator = new ExampleSocialMediaStormDeclarator(rmqExchange, rmqExchangeType, rmqRouting, rmqQueueName,
rmqPersistence);
/*
* Initialise Social Media Spout API:
* http://nathanmarz.github.io/storm/doc-0.8.1/index.html (search for
* "IRichSpout")
*/
stormExampleSocialMediaAMQPSpout = new ExampleSocialMediaAMQPSpout(jsonScheme, declarator);
Logger.getLogger(TopologyRunner.class.getName()).log(Level.INFO,
"Initialised AMQP Spout object on exchange " + rmqExchange);
/*
* Create a simple STORM topology configuration file Documentation (no
* API, just an example of usage):
* https://github.com/ppat/storm-rabbitmq/blob/master/README.md (search
* for "Config")
*/
Config conf = new Config();
conf.put(Config.NIMBUS_HOST, nimbusHost);
conf.put(Config.TOPOLOGY_DEBUG, topologyDebug);
conf.setDebug(topologyDebug);
Logger.getLogger(TopologyRunner.class.getName()).log(Level.INFO, "Initialised main example Storm confuration.");
/*
* Initialise Storm Topology API:
* http://nathanmarz.github.io/storm/doc-0.8.1/index.html (search for
* "TopologyBuilder")
*/
builder = new TopologyBuilder();
/*
* Define a new Spout in the topology API:
* http://nathanmarz.github.io/storm/doc-0.8.1/index.html (search for
* "SpoutDeclarer")
*/
spoutDeclarer = builder.setSpout(spoutId, stormExampleSocialMediaAMQPSpout);
Logger.getLogger(TopologyRunner.class.getName()).log(Level.INFO,
"Declared AMQP Spout to the example Storm topology.");
// Add configuration to the StoputDeclarer
spoutDeclarer.addConfigurations(spoutConfig.asMap());
/*
* Explanation taken from: https://github.com/ppat/storm-rabbitmq Set
* MaxSpoutPending value to the same value as RabbitMQ pre-fetch count
* (set initially in in the ConsumerConfig above). It is possible to
* tune them later separately, but MaxSpoutPending should always be <=
* Prefetch
*/
spoutDeclarer.setMaxSpoutPending(maxSpoutPending);
spoutDeclarer.setDebug(spoutDebug);
BoltDeclarer boltDeclarer;
DiscussionTreeBolt discussionTreeBolt = new DiscussionTreeBolt(emitFieldsId);
boltDeclarer = builder.setBolt("discussionTeeBoltId", discussionTreeBolt);
boltDeclarer.shuffleGrouping(spoutId);
RoleAnalysisBolt roleAnalysisBolt = new RoleAnalysisBolt(emitFieldsId, pServerHostName, pServerMode,
pServerClientName, pServerClientPasswd, pServerInitData);
boltDeclarer = builder.setBolt("roleAnalyisBoltId", roleAnalysisBolt);
boltDeclarer.shuffleGrouping("discussionTeeBoltId");
try {
// Submit the topology to the distribution cluster that will be
// defined in Storm client configuration file or via cmd as a
// parameter ( e.g. nimbus.host=localhost )
StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException ex) {
Logger.getLogger(TopologyRunner.class.getName()).log(Level.SEVERE, null, ex);
}
Logger.getLogger(TopologyRunner.class.getName()).log(Level.INFO, "Submitted topology : " + topologyName);
}
}