Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add tests and some comments
  • Loading branch information
HyukjinKwon committed Mar 2, 2016
commit 04e4a5167c49b6359f0cfac29d7f2f0c84bd497d
12 changes: 12 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,12 @@ def parquet(self, path, mode=None, partitionBy=None):
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns

You can set the following Parquet-specific option(s) for writing Parquet files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just put this as an argument?

This can be one of the known case-insensitive shorten names
(``uncompressed``, ``snappy``,``gzip``, and ``lzo``).
This will overwrite ``orc.compress``.

>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
Expand Down Expand Up @@ -547,6 +553,12 @@ def orc(self, path, mode=None, partitionBy=None):
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns

You can set the following ORC-specific option(s) for writing ORC files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``uncompressed``, ``snappy``,``zlib``, and ``lzo``).
This will overwrite ``orc.compress``.

>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
"""
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("parquet").save(path)
* }}}
*
* You can set the following Parquet-specific option(s) for writing Parquet files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`uncompressed`, `snappy`,`gzip`, and
* `lzo`). This will overwrite `spark.sql.parquet.compression.codec`. </li>
*
* @since 1.4.0
*/
def parquet(path: String): Unit = format("parquet").save(path)
Expand All @@ -479,6 +484,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("orc").save(path)
* }}}
*
* You can set the following ORC-specific option(s) for writing ORC files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`uncompressed`, `snappy`, `zlib`, and
* `lzo`). This will overwrite `orc.compress`. </li>
*
* @since 1.5.0
* @note Currently, this method can only be used together with `HiveContext`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,20 +746,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}

test("SPARK-13543: Support for specifying compression codec for Parquet via option()") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")
df.write
.option("compression", "gZiP")
.parquet(path)

val compressedFiles = new File(path).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet")))

val copyDf = sqlContext
.read
.parquet(path)
checkAnswer(df, copyDf)
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")
df.write
.option("compression", "GzIP")
.parquet(path)

val compressedFiles = new File(path).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet")))

val copyDf = sqlContext
.read
.parquet(path)
checkAnswer(df, copyDf)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
import org.apache.hadoop.hive.ql.io.orc._
import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -234,7 +233,7 @@ private[sql] class OrcRelation(
.shortOrcCompressionCodecNames
.getOrElse(codecName, CompressionKind.NONE).name())
}

job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
Expand Down Expand Up @@ -366,6 +365,7 @@ private[orc] object OrcRelation {
// The ORC compression short names
val shortOrcCompressionCodecNames = Map(
"NONE" -> CompressionKind.NONE,
"UNCOMPRESSED" -> CompressionKind.NONE,
"SNAPPY" -> CompressionKind.SNAPPY,
"ZLIB" -> CompressionKind.ZLIB,
"LZO" -> CompressionKind.LZO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
.read
.parquet(path)

checkAnswer(df,copyDf)
checkAnswer(df, copyDf)
}
}
}