-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsumDrugs.java
161 lines (122 loc) · 4.77 KB
/
sumDrugs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class sumDrugs{
static String delim_CSV = ",";
static final String TOP_K = "10";
public static class Map1 extends Mapper<Object, Text, Text, DoubleWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] line, amounts;
String state;
boolean TargetTypeState;
//Check if second element is STATE to process accordingly
try {
Double.parseDouble(value.toString().split(delim_CSV, 3)[1]);
TargetTypeState = false;
} catch (Exception e) {
TargetTypeState = true;
}
// System.out.println(key.toString() + " --- " + value.toString());
if(TargetTypeState) {
line = value.toString().split(delim_CSV, 3);
state = line[1];
amounts = line[2].split(delim_CSV);
}else {
line = value.toString().split(delim_CSV, 2);
state = "national";
amounts = line[1].split(delim_CSV);
}
int i = 0;
for (int y = 1992; y < 2019; y++) {
context.write(new Text(y+delim_CSV+state), new DoubleWritable(Double.parseDouble(amounts[i])));
i++;
}
}
}
public static class Red1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
// System.out.println("Running RED from Job1");
Double amount =0.0;
for (DoubleWritable val : values) {
amount += Double.parseDouble(val.toString());
}
context.write(key, new DoubleWritable(amount));
}
}
public static class Map2 extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line [] = value.toString().split(" ");
String state =line[0].split(delim_CSV)[1];
String amount = line[1];
context.write(new Text(state), new Text(amount));
}
}
public static class Red2 extends Reducer<Text, Text, Text, NullWritable> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder amounts = new StringBuilder();
amounts.append(key.toString());
amounts.append(delim_CSV);
for (Text val : values) {
amounts.append(val);
amounts.append(delim_CSV);
// amounts.insert(0,val);
// amounts.insert(0,delim_CSV);
}
amounts.deleteCharAt(amounts.length()-1);
context.write(new Text(amounts.toString()), NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
/*
* args: <trend_path> K
* 1º arg >
* trend_path: path for the some drug trend
* 2º arg >
* K: amount of top values to output
* e.g: "trend_opioid_national 15" will create a file with only the first top 15 drugs from
* the file "trend_opioid_national" and save it as "topK_trend_opioid_national"
*/
//folders to organize the directory
String resF = "res/";
String outF = resF + "out_cluster/";
String inPath = outF + args[0];
String outPath = outF + "topK_/" + "sum_" + args[0];
String pathJ1= "aux_sumDrugs";
Configuration conf = new Configuration();
FileUtils.deleteDirectory(new File(pathJ1));
FileUtils.deleteDirectory(new File(outPath));
Job job1 = Job.getInstance(conf, "sum drugs j1");
job1.setJarByClass(sumDrugs.class);
job1.setMapperClass(Map1.class);
job1.setReducerClass(Red1.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job1, new Path(inPath));
FileOutputFormat.setOutputPath(job1, new Path(pathJ1));
job1.waitForCompletion(true);
//System.exit(job1.waitForCompletion(true) ? 0 : 1);
/////////////////////// JOB 2 /////////////////////////////////////////////////
Job job2 = Job.getInstance(conf, "sum drugs j2");
job2.setJarByClass(sumDrugs.class);
job2.setMapperClass(Map2.class);
job2.setReducerClass(Red2.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job2, new Path(pathJ1));
FileOutputFormat.setOutputPath(job2, new Path(outPath));
// job2.waitForCompletion(true);
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}