Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add some tests and functionality for expliclty setting no compression
  • Loading branch information
HyukjinKwon committed Mar 3, 2016
commit 4c1ffc52f20d29e6a300d5665d7ae53ce0335456
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,16 @@ private[datasources] object CompressionCodecs {
* `codec` should be a full class path
*/
def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
if (codec != null){
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
} else {
// This infers the option `compression` is set to `uncompressed` or `none`.
conf.set("mapreduce.output.fileoutputformat.compress", "false")
conf.set("mapreduce.map.output.compress", "false")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ private[sql] class JSONOptions(
parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
private val compressionName = parameters.get("compression")
val compressionCodec = compressionName.map(CompressionCodecs.getCodecClassName)

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import java.io.File
import java.nio.charset.UnsupportedCharsetException
import java.sql.Timestamp

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
Expand Down Expand Up @@ -396,6 +402,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("SPARK-13543 Set explicitly the output as uncompressed") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly does this test do? it's not super clear here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it to test whether uncompressed mode would work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, which one would be better?
Should I just write like write the output as uncompressed via option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - i think that's better

val clonedConf = new Configuration(hadoopConfiguration)
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
withTempDir { dir =>
try {
val csvDir = new File(dir, "csv").getCanonicalPath
val cars = sqlContext.read
.format("csv")
.option("header", "true")
.load(testFile(carsFile))

cars.coalesce(1).write
.format("csv")
.option("header", "true")
.option("compression", "none")
.save(csvDir)

val compressedFiles = new File(csvDir).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))

val carsCopy = sqlContext.read
.format("csv")
.option("header", "true")
.load(csvDir)

verifyCars(carsCopy, withHeader = true)
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

test("Schema inference correctly identifies the datatype when data is sparse.") {
val df = sqlContext.read
.format("csv")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._

import com.fasterxml.jackson.core.JsonFactory
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.scalactic.Tolerance._

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -1496,31 +1497,46 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("SPARK-12872 Support to specify the option for compression codec") {
test("SPARK-13543 Set explicitly the output as uncompressed") {
val clonedConf = new Configuration(hadoopConfiguration)
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
withTempDir { dir =>
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)

val jsonDF = sqlContext.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
jsonDF.coalesce(1).write
.format("json")
.option("compression", "gZiP")
.save(jsonDir)

val compressedFiles = new File(jsonDir).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(".gz")))

val jsonCopy = sqlContext.read
.format("json")
.load(jsonDir)

assert(jsonCopy.count == jsonDF.count)
val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
checkAnswer(jsonCopySome, jsonDFSome)
try {
val dir = Utils.createTempDir()
dir.delete()

val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)

val jsonDF = sqlContext.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
jsonDF.coalesce(1).write
.format("json")
.option("compression", "none")
.save(jsonDir)

val compressedFiles = new File(jsonDir).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))

val jsonCopy = sqlContext.read
.format("json")
.load(jsonDir)

assert(jsonCopy.count == jsonDF.count)
val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
checkAnswer(jsonCopySome, jsonDFSome)
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package org.apache.spark.sql.execution.datasources.text

import java.io.File

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{StringType, StructType}
Expand Down Expand Up @@ -59,22 +67,51 @@ class TextSuite extends QueryTest with SharedSQLContext {

test("SPARK-13503 Support to specify the option for compression codec for TEXT") {
val testDf = sqlContext.read.text(testFile)

Seq("bzip2", "deflate", "gzip").foreach { codecName =>
val tempDir = Utils.createTempDir()
val tempDirPath = tempDir.getAbsolutePath()
testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
verifyFrame(sqlContext.read.text(tempDirPath))
val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz")
extensionNameMap.foreach {
case (codecName, extension) =>
val tempDir = Utils.createTempDir()
val tempDirPath = tempDir.getAbsolutePath
testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(extension)))
verifyFrame(sqlContext.read.text(tempDirPath))
}

val errMsg = intercept[IllegalArgumentException] {
val tempDirPath = Utils.createTempDir().getAbsolutePath()
val tempDirPath = Utils.createTempDir().getAbsolutePath
testDf.write.option("compression", "illegal").mode(SaveMode.Overwrite).text(tempDirPath)
}
assert(errMsg.getMessage.contains("Codec [illegal] is not available. " +
"Known codecs are"))
}

test("SPARK-13543 Set explicitly the output as uncompressed") {
val clonedConf = new Configuration(hadoopConfiguration)
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
withTempDir { dir =>
try {
val testDf = sqlContext.read.text(testFile)
val tempDir = Utils.createTempDir()
val tempDirPath = tempDir.getAbsolutePath
testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
verifyFrame(sqlContext.read.text(tempDirPath))
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

private def testFile: String = {
Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString
}
Expand Down