Skip to content

Commit c4a9a30

Browse files
authored
Add files via upload
1 parent 19957d4 commit c4a9a30

File tree

3 files changed

+208
-0
lines changed

3 files changed

+208
-0
lines changed

MySQLoHiveDataIngestion.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import java.sql.Connection;
2+
import java.sql.DriverManager;
3+
import java.sql.ResultSet;
4+
import java.sql.ResultSetMetaData;
5+
import java.sql.SQLException;
6+
import java.sql.Statement;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.Properties;
10+
11+
import org.apache.log4j.Logger;
12+
import org.apache.spark.SparkContext;
13+
import org.apache.spark.api.java.JavaSparkContext;
14+
import org.apache.spark.sql.DataFrame;
15+
import org.apache.spark.sql.Dataset;
16+
import org.apache.spark.sql.Row;
17+
import org.apache.spark.sql.SQLContext;
18+
import org.apache.spark.sql.hive.HiveContext;
19+
import org.apache.spark.sql.types.DataTypes;
20+
21+
public class MySQLoHiveDataIngestion {
22+
23+
24+
private static final Logger logger = Logger.getLogger(MySQLoHiveDataIngestion.class.getName());
25+
private static Connection conection = null;
26+
27+
public static void main(String[] args) throws ClassNotFoundException, SQLException {
28+
logger.info("Calling main Method");
29+
logger.info("Creating Java Spark Context");
30+
JavaSparkContext javaSparkContext = new JavaSparkContext("local", "MySQLoHiveDataIngestion");
31+
logger.info("Creating SQLContext");
32+
//SparkContext sqlContext = new org.apache.spark.sql.SQLContext(javaSparkContext);
33+
HiveContext hiveContext = new HiveContext(javaSparkContext);
34+
Properties props = new Properties();
35+
props.setProperty("user", "root");
36+
props.setProperty("password", "root");
37+
//Dataset<Row> result = hiveContext.read().jdbc("jdbc:mysql://localhost:3306/hadoop", "transaction", props);;
38+
39+
DataFrame result = hiveContext.read().jdbc("jdbc:mysql://localhost:3306/hadoop", "transaction", props);
40+
//result.show();
41+
//result.printSchema();
42+
//Dataset<Row> newResult = result.wi
43+
//conection = MySQLoHiveDataIngestion.getConnection();
44+
//String createTable = "CREATE TABLE IF NOT EXISTS " + " transaction ("
45+
//;
46+
conection = getConnection();
47+
String createTable = MySQLoHiveDataIngestion.getHiveTableQuery();
48+
DataFrame newTable = hiveContext.sql(createTable);
49+
50+
51+
List<Row> arrayList= new ArrayList<Row>();
52+
arrayList = result.collectAsList();
53+
54+
55+
//newTable.withColumn(result, col)
56+
newTable.printSchema();
57+
}
58+
59+
public static Connection getConnection() throws ClassNotFoundException, SQLException {
60+
61+
Class.forName("com.mysql.jdbc.Driver");
62+
Connection conection=DriverManager.getConnection(
63+
"jdbc:mysql://localhost:3306/hadoop","root","root");
64+
return conection;
65+
66+
}
67+
68+
69+
public static String getHiveTableQuery() throws SQLException {
70+
71+
Statement st = conection.createStatement();
72+
ResultSet rs = st.executeQuery("SELECT * FROM transaction");
73+
String createTable = "CREATE TABLE IF NOT EXISTS " + " transaction (";
74+
ResultSetMetaData rsMetaData = rs.getMetaData();
75+
for(int i=1;i<=rsMetaData.getColumnCount();i++) {
76+
logger.info("Column Name " + rsMetaData.getColumnName(i));
77+
logger.info("Column DataType " + rsMetaData.getColumnTypeName(i));
78+
//logger.info("Column DataType " + rsMetaData.);
79+
//org.apache.spark.sql.types.DataType
80+
81+
createTable += rsMetaData.getColumnName(i) + " " +
82+
getHiveDataType(rsMetaData.getColumnTypeName(i)) + ",";
83+
}
84+
85+
createTable = createTable.trim().substring(0, createTable.length()-1) + ") STORED AS PARQUET" ;
86+
87+
logger.info("Create Table Statement : " + createTable);
88+
conection.close();
89+
return createTable;
90+
91+
}
92+
93+
94+
public static String getHiveDataType(String dataType) {
95+
96+
if (dataType == "STRING" || dataType == "CHAR" || dataType == "VARCHAR2" || dataType == "VARCHAR") {
97+
return "STRING";
98+
} else if (dataType == "INT") {
99+
return "INT";
100+
} else if (dataType == "LONG") {
101+
return "BIGINT";
102+
} else if (dataType == "FLOAT") {
103+
return "FLOAT";
104+
} else if (dataType == "DOUBLE") {
105+
return "DOUBLE";
106+
} else if (dataType == "BOOLEAN") {
107+
return "TINYINT";
108+
} else if (dataType == "BYTE") {
109+
return "SMALLINT";
110+
} else if (dataType == "DECIMAL") {
111+
return "DECIMAL";
112+
}else if (dataType == "DATE") {
113+
return "DATE";
114+
}
115+
116+
117+
return null;
118+
}
119+
120+
121+
122+
123+
124+
125+
126+
127+
}

Spark Submit

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
spark-submit --master local[*] --jars /usr/share/java/mysql-connector-java.jar --class MySQLoHiveDataIngestion MySQLToHive-0.0.1-SNAPSHOT.jar
2+
spark-submit --master yarn \
3+
--deploy-mode cluster \
4+
--driver-memory 4g \
5+
--executor-memory 2g \
6+
--executor-cores 4 \
7+
--queue default \
8+
--class MySQLoHiveDataIngestion MySQLToHive-0.0.1-SNAPSHOT.jar
9+
10+
export HADOOP_CONF_DIR=$HADOOP_HOME/usr/local/hadoop
11+

pom.xml

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<groupId>MySQLToHive</groupId>
4+
<artifactId>MySQLToHive</artifactId>
5+
<version>0.0.1-SNAPSHOT</version>
6+
7+
<dependencies>
8+
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
9+
<dependency>
10+
<groupId>org.apache.spark</groupId>
11+
<artifactId>spark-core_2.11</artifactId>
12+
<version>1.6.2</version>
13+
</dependency>
14+
15+
16+
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
17+
<dependency>
18+
<groupId>org.apache.spark</groupId>
19+
<artifactId>spark-sql_2.11</artifactId>
20+
<version>1.6.2</version>
21+
</dependency>
22+
23+
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
24+
<dependency>
25+
<groupId>org.apache.spark</groupId>
26+
<artifactId>spark-hive_2.11</artifactId>
27+
<version>1.6.2</version>
28+
</dependency>
29+
30+
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
31+
<dependency>
32+
<groupId>mysql</groupId>
33+
<artifactId>mysql-connector-java</artifactId>
34+
<version>5.1.38</version>
35+
</dependency>
36+
37+
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
38+
<dependency>
39+
<groupId>org.apache.hive</groupId>
40+
<artifactId>hive-jdbc</artifactId>
41+
<version>1.2.2</version>
42+
</dependency>
43+
44+
45+
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
46+
<dependency>
47+
<groupId>log4j</groupId>
48+
<artifactId>log4j</artifactId>
49+
<version>1.2.17</version>
50+
</dependency>
51+
52+
<!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
53+
<dependency>
54+
<groupId>commons-lang</groupId>
55+
<artifactId>commons-lang</artifactId>
56+
<version>2.6</version>
57+
</dependency>
58+
59+
<!-- https://mvnrepository.com/artifact/com.databricks/spark-avro -->
60+
<dependency>
61+
<groupId>com.databricks</groupId>
62+
<artifactId>spark-avro_2.10</artifactId>
63+
<version>1.0.0</version>
64+
</dependency>
65+
66+
67+
68+
69+
</dependencies>
70+
</project>

0 commit comments

Comments
 (0)