Skip to content

Commit 3663fd0

Browse files
Elena AkhmatovaElena Akhmatova
authored andcommitted
at2
1 parent 932b0b6 commit 3663fd0

File tree

2 files changed

+165
-0
lines changed

2 files changed

+165
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package main.java.com.matthewrathbone.sparktest;
2+
3+
import java.io.FileNotFoundException;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Map.Entry;
8+
9+
10+
import org.apache.hadoop.mapred.TextOutputFormat;
11+
import org.apache.spark.SparkConf;
12+
import org.apache.spark.api.java.JavaPairRDD;
13+
import org.apache.spark.api.java.JavaRDD;
14+
import org.apache.spark.api.java.JavaSparkContext;
15+
import org.apache.spark.api.java.function.Function2;
16+
import org.apache.spark.api.java.function.PairFlatMapFunction;
17+
import org.apache.spark.api.java.function.PairFunction;
18+
import org.apache.spark.network.shuffle.ShuffleClient;
19+
20+
21+
import scala.Tuple2;
22+
import scala.Predef;
23+
import com.google.common.base.Optional;
24+
25+
public class SparkJoins {
26+
@SuppressWarnings("serial")
27+
28+
public static final PairFunction<Tuple2<Integer, Optional<String>>, Integer, String> KEY_VALUE_PAIRER =
29+
new PairFunction<Tuple2<Integer, Optional<String>>, Integer, String>() {
30+
public Tuple2<Integer, String> call(
31+
Tuple2<Integer, Optional<String>> a) throws Exception {
32+
// a._2.isPresent()
33+
return new Tuple2<Integer, String>(a._1, a._2.get());
34+
}
35+
};
36+
37+
public static JavaRDD<Tuple2<Integer,Optional<String>>> joinData(JavaPairRDD<Integer, Integer> t, JavaPairRDD<Integer, String> u){
38+
//Left Outer join operation
39+
JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = t.leftOuterJoin(u).values().distinct();
40+
//System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());
41+
return leftJoinOutput;
42+
}
43+
44+
public static JavaPairRDD<Integer, String> modifyData(JavaRDD<Tuple2<Integer,Optional<String>>> d){
45+
return d.mapToPair(KEY_VALUE_PAIRER);
46+
//System.out.println("MapToPair function Output: "+res.collect());
47+
}
48+
49+
public static Map<Integer, Object> countData(JavaPairRDD<Integer, String> d){
50+
//System.out.println("MapToPair function Output: "+res.collect());
51+
Map<Integer, Object> result = d.countByKey();
52+
//System.out.println("CountByKey function Output: "+result.toString());
53+
return result;
54+
}
55+
56+
57+
public static void main(String[] args) throws FileNotFoundException {
58+
// SPARK_USER
59+
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJoins").setMaster("local"));
60+
61+
JavaRDD<String> transactionInputFile = sc.textFile(args[0]);
62+
JavaPairRDD<Integer, Integer> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, Integer, Integer>() {
63+
public Tuple2<Integer, Integer> call(String s) {
64+
String[] transactionSplit = s.split("\t");
65+
return new Tuple2<Integer, Integer>(Integer.valueOf(transactionSplit[2]), Integer.valueOf(transactionSplit[1]));
66+
}
67+
});
68+
69+
JavaRDD<String> customerInputFile = sc.textFile(args[1]);
70+
JavaPairRDD<Integer, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, Integer, String>() {
71+
public Tuple2<Integer, String> call(String s) {
72+
String[] customerSplit = s.split("\t");
73+
return new Tuple2<Integer, String>(Integer.valueOf(customerSplit[0]), customerSplit[3]);
74+
}
75+
});
76+
77+
Map<Integer, Object> result = countData(modifyData(joinData(transactionPairs, customerPairs)));
78+
79+
List<Tuple2<Integer, Long>> output = new ArrayList<>();
80+
for (Entry<Integer, Object> entry : result.entrySet()){
81+
output.add(new Tuple2<>(entry.getKey(), (long)entry.getValue()));
82+
}
83+
84+
JavaPairRDD<Integer, Long> output_rdd = sc.parallelizePairs(output);
85+
output_rdd.saveAsHadoopFile(args[2], Integer.class, String.class, TextOutputFormat.class);
86+
87+
sc.close();
88+
}
89+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package test.java.com.matthewrathbone.sparktest;
2+
3+
import java.io.File;
4+
5+
6+
import java.io.Serializable;
7+
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
import main.java.com.matthewrathbone.sparktest.SparkJoins;
13+
14+
import org.apache.spark.api.java.JavaPairRDD;
15+
import org.apache.spark.api.java.JavaRDD;
16+
import org.apache.spark.api.java.JavaSparkContext;
17+
import org.junit.After;
18+
import org.junit.Assert;
19+
import org.junit.Before;
20+
import org.junit.Test;
21+
22+
import scala.Tuple2;
23+
24+
import com.google.common.base.Optional;
25+
import com.google.common.io.Files;
26+
27+
public class SparkJoinsTest implements Serializable {
28+
private transient JavaSparkContext sc;
29+
private transient File tempDir;
30+
31+
@Before
32+
public void setUp() {
33+
sc = new JavaSparkContext("local", "SparkJoinsTest");
34+
tempDir = Files.createTempDir();
35+
tempDir.deleteOnExit();
36+
}
37+
38+
@After
39+
public void tearDown() {
40+
sc.stop();
41+
sc = null;
42+
}
43+
44+
@Test
45+
public void sortByKey() {
46+
List<Tuple2<Integer, Integer>> transactions = new ArrayList<>();
47+
transactions.add(new Tuple2<>(1, 1));
48+
transactions.add(new Tuple2<>(2, 1));
49+
transactions.add(new Tuple2<>(2, 1));
50+
transactions.add(new Tuple2<>(3, 2));
51+
transactions.add(new Tuple2<>(3, 1));
52+
53+
List<Tuple2<Integer, String>> users = new ArrayList<>();
54+
users.add(new Tuple2<>(1, "US"));
55+
users.add(new Tuple2<>(2, "GB"));
56+
users.add(new Tuple2<>(3, "FR"));
57+
58+
JavaPairRDD<Integer, Integer> transactions_rdd = sc.parallelizePairs(transactions);
59+
JavaPairRDD<Integer, String> users_rdd = sc.parallelizePairs(users);
60+
61+
JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = SparkJoins.joinData(transactions_rdd, users_rdd);
62+
63+
Assert.assertEquals(4, leftJoinOutput.count());
64+
JavaPairRDD<Integer, String> res = SparkJoins.modifyData(leftJoinOutput);
65+
List<Tuple2<Integer, String>> sortedRes = res.sortByKey().collect();
66+
Assert.assertEquals(1, sortedRes.get(0)._1.intValue());
67+
Assert.assertEquals(1, sortedRes.get(1)._1.intValue());
68+
Assert.assertEquals(1, sortedRes.get(2)._1.intValue());
69+
Assert.assertEquals(2, sortedRes.get(3)._1.intValue());
70+
71+
Map<Integer, Object> result = SparkJoins.countData(res);
72+
Assert.assertEquals((long)3, result.get(1));
73+
Assert.assertEquals((long)1, result.get(2));
74+
75+
}
76+
}

0 commit comments

Comments
 (0)