Skip to content
Prev Previous commit
Next Next commit
SPARK-22833 [Improvement] in SparkHive Scala example - change in comm…
…ents made
  • Loading branch information
chetkhatri committed Dec 21, 2017
commit 69a4145140b4b8980f3893b677985b49f2da12a7
Original file line number Diff line number Diff line change
Expand Up @@ -102,58 +102,32 @@ object SparkHiveExample {
// | 5| val_5| 5| val_5|
// ...

/*
* Save DataFrame to Hive Managed table as Parquet format
* 1. Create Hive Database / Schema with location at HDFS if you want to mentioned explicitly else default
* warehouse location will be used to store Hive table Data.
* Ex: CREATE DATABASE IF NOT EXISTS database_name LOCATION hdfs_path;
* You don't have to explicitly give location for each table, every tables under specified schema will be located at
* location given while creating schema.
* 2. Create Hive Managed table with storage format as 'Parquet'
* Ex: CREATE TABLE records(key int, value string) STORED AS PARQUET;
*/
val hiveTableDF = sql("SELECT * FROM records").toDF()
// Save DataFrame to Hive Managed table as Parquet format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Managed -> managed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

val hiveTableDF = sql("SELECT * FROM records")
hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records")

/*
* Save DataFrame to Hive External table as compatible parquet format.
* 1. Create Hive External table with storage format as parquet.
* Ex: CREATE EXTERNAL TABLE records(key int, value string) STORED AS PARQUET;
* Since we are not explicitly providing hive database location, it automatically takes default warehouse location
* given to 'spark.sql.warehouse.dir' while creating SparkSession with enableHiveSupport().
* For example, we have given '/user/hive/warehouse/' as a Hive Warehouse location. It will create schema directories
* under '/user/hive/warehouse/' as '/user/hive/warehouse/database_name.db' and '/user/hive/warehouse/database_name'.
*/

// to make Hive parquet format compatible with spark parquet format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet ->Parquet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark -> Spark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

// Multiple parquet files could be created accordingly to volume of data under directory given.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet -> Parquet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

val hiveExternalTableLocation = s"/user/hive/warehouse/database_name.db/records"
val hiveExternalTableLocation = "/user/hive/warehouse/database_name.db/records"

// Save DataFrame to Hive External table as compatible parquet format
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet ->Parquet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation)

// turn on flag for Dynamic Partitioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turn -> Turn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

// You can create partitions in Hive table, so downstream queries run much faster.
hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key")
.parquet(hiveExternalTableLocation)
/*
If Data volume is very huge, then every partitions would have many small-small files which may harm
downstream query performance due to File I/O, Bandwidth I/O, Network I/O, Disk I/O.
To improve performance you can create single parquet file under each partition directory using 'repartition'
on partitioned key for Hive table. When you add partition to table, there will be change in table DDL.
Ex: CREATE TABLE records(value string) PARTITIONED BY(key int) STORED AS PARQUET;
*/

// reduce number of files for each partition by repartition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce -> Reduce.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a standard usage, let's not put it in the example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan removed all comments , as discussed with @srowen it does really make sense to have at docs with removed inconsitency.

.partitionBy("key").parquet(hiveExternalTableLocation)

/*
You can also do coalesce to control number of files under each partitions, repartition does full shuffle and equal
data distribution to all partitions. here coalesce can reduce number of files to given 'Int' argument without
full data shuffle.
*/
// coalesce of 10 could create 10 parquet files under each partitions,
// if data is huge and make sense to do partitioning.
// Control number of files in each partition by coalesce
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Control number of files -> Control the number of files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for highlight, improved the same.

hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.partitionBy("key").parquet(hiveExternalTableLocation)
// $example off:spark_hive$
Expand Down