Skip to content

Commit c63f535

Browse files
committed
Create ExampleJob.java
1 parent efb41d8 commit c63f535

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package main.java.com.matthewrathbone.sparktest;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.Map.Entry;
7+
8+
import org.apache.hadoop.mapred.TextOutputFormat;
9+
import org.apache.spark.SparkConf;
10+
import org.apache.spark.api.java.JavaPairRDD;
11+
import org.apache.spark.api.java.JavaRDD;
12+
import org.apache.spark.api.java.JavaSparkContext;
13+
import org.apache.spark.api.java.function.PairFunction;
14+
15+
import scala.Tuple2;
16+
17+
import com.google.common.base.Optional;
18+
19+
public class ExampleJob {
20+
private JavaSparkContext sc;
21+
22+
public ExampleJob(JavaSparkContext sc){
23+
this.sc = sc;
24+
}
25+
26+
public static final PairFunction<Tuple2<Integer, Optional<String>>, Integer, String> KEY_VALUE_PAIRER =
27+
new PairFunction<Tuple2<Integer, Optional<String>>, Integer, String>() {
28+
public Tuple2<Integer, String> call(
29+
Tuple2<Integer, Optional<String>> a) throws Exception {
30+
// a._2.isPresent()
31+
return new Tuple2<Integer, String>(a._1, a._2.get());
32+
}
33+
};
34+
35+
public static JavaRDD<Tuple2<Integer,Optional<String>>> joinData(JavaPairRDD<Integer, Integer> t, JavaPairRDD<Integer, String> u){
36+
JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = t.leftOuterJoin(u).values().distinct();
37+
return leftJoinOutput;
38+
}
39+
40+
public static JavaPairRDD<Integer, String> modifyData(JavaRDD<Tuple2<Integer,Optional<String>>> d){
41+
return d.mapToPair(KEY_VALUE_PAIRER);
42+
}
43+
44+
public static Map<Integer, Object> countData(JavaPairRDD<Integer, String> d){
45+
Map<Integer, Object> result = d.countByKey();
46+
return result;
47+
}
48+
49+
public JavaPairRDD<String, String> run(String t, String u){
50+
JavaRDD<String> transactionInputFile = sc.textFile(t);
51+
JavaPairRDD<Integer, Integer> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, Integer, Integer>() {
52+
public Tuple2<Integer, Integer> call(String s) {
53+
String[] transactionSplit = s.split("\t");
54+
return new Tuple2<Integer, Integer>(Integer.valueOf(transactionSplit[2]), Integer.valueOf(transactionSplit[1]));
55+
}
56+
});
57+
58+
JavaRDD<String> customerInputFile = sc.textFile(u);
59+
JavaPairRDD<Integer, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, Integer, String>() {
60+
public Tuple2<Integer, String> call(String s) {
61+
String[] customerSplit = s.split("\t");
62+
return new Tuple2<Integer, String>(Integer.valueOf(customerSplit[0]), customerSplit[3]);
63+
}
64+
});
65+
66+
Map<Integer, Object> result = countData(modifyData(joinData(transactionPairs, customerPairs)));
67+
68+
List<Tuple2<String, String>> output = new ArrayList<>();
69+
for (Entry<Integer, Object> entry : result.entrySet()){
70+
output.add(new Tuple2<>(entry.getKey().toString(), String.valueOf((long)entry.getValue())));
71+
}
72+
73+
JavaPairRDD<String, String> output_rdd = sc.parallelizePairs(output);
74+
return output_rdd;
75+
}
76+
77+
public static void main(String[] args) throws Exception {
78+
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJoins").setMaster("local"));
79+
ExampleJob job = new ExampleJob(sc);
80+
JavaPairRDD<String, String> output_rdd = job.run(args[0], args[1]);
81+
output_rdd.saveAsHadoopFile(args[2], String.class, String.class, TextOutputFormat.class);
82+
sc.close();
83+
}
84+
}

0 commit comments

Comments
 (0)