Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,12 @@ def parquet(self, path, mode=None, partitionBy=None):
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns

You can set the following Parquet-specific option(s) for writing Parquet files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just put this as an argument?

This can be one of the known case-insensitive shorten names
(``uncompressed``, ``snappy``,``gzip``, and ``lzo``).
This will overwrite ``spark.sql.parquet.compression.codec``.

>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
Expand Down Expand Up @@ -547,6 +553,12 @@ def orc(self, path, mode=None, partitionBy=None):
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns

You can set the following ORC-specific option(s) for writing ORC files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``uncompressed``, ``snappy``,``zlib``, and ``lzo``).
This will overwrite ``orc.compress``.

>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
"""
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("parquet").save(path)
* }}}
*
* You can set the following Parquet-specific option(s) for writing Parquet files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`uncompressed`, `snappy`,`gzip`, and
* `lzo`). This will overwrite `spark.sql.parquet.compression.codec`. </li>
*
* @since 1.4.0
*/
def parquet(path: String): Unit = format("parquet").save(path)
Expand All @@ -479,6 +484,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("orc").save(path)
* }}}
*
* You can set the following ORC-specific option(s) for writing ORC files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`uncompressed`, `snappy`, `zlib`, and
* `lzo`). This will overwrite `orc.compress`. </li>
*
* @since 1.5.0
* @note Currently, this method can only be used together with `HiveContext`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ private[sql] class ParquetRelation(
.get(ParquetRelation.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])

private val compressionCodec: Option[String] = parameters
.get("compression")
.map { codecName =>
// Validate if given compression codec is supported or not.
val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames
if (!shortParquetCompressionCodecNames.contains(codecName.toUpperCase)) {
val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
}
codecName.toUpperCase
}

private lazy val metadataCache: MetadataCache = {
val meta = new MetadataCache
meta.refresh()
Expand Down Expand Up @@ -286,7 +299,8 @@ private[sql] class ParquetRelation(
ParquetRelation
.shortParquetCompressionCodecNames
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toUpperCase,
compressionCodec
.getOrElse(sqlContext.conf.parquetCompressionCodec.toUpperCase),
CompressionCodecName.UNCOMPRESSED).name())

new BucketedOutputWriterFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
import org.apache.hadoop.hive.ql.io.orc._
import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
Expand Down Expand Up @@ -162,6 +163,19 @@ private[sql] class OrcRelation(
extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {

private val compressionCodec: Option[String] = parameters
.get("compression")
.map { codecName =>
// Validate if given compression codec is supported or not.
val shortOrcCompressionCodecNames = OrcRelation.shortOrcCompressionCodecNames
if (!shortOrcCompressionCodecNames.contains(codecName.toUpperCase)) {
val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase)
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
}
codecName.toUpperCase
}

private[sql] def this(
paths: Array[String],
maybeDataSchema: Option[StructType],
Expand Down Expand Up @@ -211,6 +225,15 @@ private[sql] class OrcRelation(
}

override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
// Sets compression scheme
compressionCodec.foreach { codecName =>
job.getConfiguration.set(
OrcTableProperties.COMPRESSION.getPropName,
OrcRelation
.shortOrcCompressionCodecNames
.getOrElse(codecName, CompressionKind.NONE).name())
}

job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
Expand Down Expand Up @@ -337,3 +360,14 @@ private[orc] object OrcTableScan {
// This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
}

private[orc] object OrcRelation {
// The ORC compression short names
val shortOrcCompressionCodecNames = Map(
"NONE" -> CompressionKind.NONE,
"UNCOMPRESSED" -> CompressionKind.NONE,
"SNAPPY" -> CompressionKind.SNAPPY,
"ZLIB" -> CompressionKind.ZLIB,
"LZO" -> CompressionKind.LZO)
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.sql.hive.orc

import org.apache.hadoop.fs.Path
import java.io.File

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.io.orc.{CompressionKind, OrcFile}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -81,4 +84,28 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}

test("SPARK-13543: Support for specifying compression codec for ORC via option()") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")
df.write
.option("compression", "ZlIb")
.orc(path)

// Check if this is compressed as ZLIB.
val conf = sparkContext.hadoopConfiguration
val fs = FileSystem.getLocal(conf)
val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".orc"))
assert(maybeOrcFile.isDefined)
val orcFilePath = new Path(maybeOrcFile.get.toPath.toString)
val orcReader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))
assert(orcReader.getCompression == CompressionKind.ZLIB)

val copyDf = sqlContext
.read
.orc(path)
checkAnswer(df, copyDf)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,24 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
checkAnswer(loadedDF, df)
}
}

test("SPARK-13543: Support for specifying compression codec for Parquet via option()") {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")
df.write
.option("compression", "GzIP")
.parquet(path)

val compressedFiles = new File(path).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet")))

val copyDf = sqlContext
.read
.parquet(path)
checkAnswer(df, copyDf)
}
}
}
}