Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
999ec13
[SPARK-22570][SQL] Avoid to create a lot of global variables by using…
kiszk Nov 30, 2017
6ac57fd
[SPARK-21417][SQL] Infer join conditions using propagated constraints
Nov 30, 2017
bcceab6
[SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user …
wangyum Nov 30, 2017
f5f8e84
[SPARK-22614] Dataset API: repartitionByRange(...)
adrian-ionescu Nov 30, 2017
7e5f669
[SPARK-22428][DOC] Add spark application garbage collector configurat…
gaborgsomogyi Dec 1, 2017
7da1f57
[SPARK-22373] Bump Janino dependency version to fix thread safety issue…
Victsm Dec 1, 2017
dc36542
[SPARK-22653] executorAddress registered in CoarseGrainedSchedulerBac…
tgravescs Dec 1, 2017
16adaf6
[SPARK-22601][SQL] Data load is getting displayed successful on provi…
sujith71955 Dec 1, 2017
9d06a9e
[SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in c…
mpetruska Dec 1, 2017
ee10ca7
[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
zsxwing Dec 1, 2017
aa4cf2b
[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients fo…
HyukjinKwon Dec 2, 2017
d2cf95a
[SPARK-22634][BUILD] Update Bouncy Castle to 1.58
srowen Dec 2, 2017
f23dddf
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based o…
dongjoon-hyun Dec 3, 2017
2c16267
[SPARK-22669][SQL] Avoid unnecessary function calls in code generation
mgaido91 Dec 3, 2017
dff440f
[SPARK-22626][SQL] deals with wrong Hive's statistics (zero rowCount)
wangyum Dec 3, 2017
4131ad0
[SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior changes in mig…
wangyum Dec 4, 2017
3927bb9
[SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions
mgaido91 Dec 4, 2017
f81401e
[SPARK-22162] Executors and the driver should use consistent JobIDs i…
Dec 4, 2017
e1dd03e
[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.
Dec 4, 2017
dcaac45
Spark on Kubernetes - basic submission client
liyinan926 Nov 10, 2017
27c67ff
Addressed first round of review comments
liyinan926 Nov 27, 2017
6d597d0
Made Client implement the SparkApplication trait
liyinan926 Nov 28, 2017
5b9fa39
Addressed the second round of comments
liyinan926 Nov 28, 2017
5ccadb5
Added missing step for supporting local:// dependencies and addressed…
liyinan926 Nov 30, 2017
12f2797
Fixed Scala style check errors
liyinan926 Nov 30, 2017
c35fe48
Addressed another round of comments
liyinan926 Dec 4, 2017
faa2849
Rebased on master and added a constant val for the Client class
liyinan926 Dec 4, 2017
347ed69
Addressed another major round of comments
liyinan926 Dec 5, 2017
0e8ca01
Addressed one more round of comments
liyinan926 Dec 5, 2017
3a0b8e3
Removed mentioning of kubernetes-namespace
liyinan926 Dec 6, 2017
83d0b9c
Fixed a couple of bugs found during manual tests
liyinan926 Dec 7, 2017
44c40b1
Guard against client mode in SparkContext
liyinan926 Dec 8, 2017
67bc847
Added libc6-compat into the base docker image
liyinan926 Dec 8, 2017
7d2b303
Addressed latest comments
liyinan926 Dec 8, 2017
caf2206
Addressed docs comments
liyinan926 Dec 9, 2017
2e7810b
Fixed a comment
liyinan926 Dec 11, 2017
cbcd30e
Addressed latest comments
liyinan926 Dec 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based o…
…n ORC 1.4.1

## What changes were proposed in this pull request?

Since [SPARK-2883](https://issues.apache.org/jira/browse/SPARK-2883), Apache Spark supports Apache ORC inside `sql/hive` module with Hive dependency. This PR aims to add a new ORC data source inside `sql/core` and to replace the old ORC data source eventually. This PR resolves the following three issues.

- [SPARK-20682](https://issues.apache.org/jira/browse/SPARK-20682): Add new ORCFileFormat based on Apache ORC 1.4.1
- [SPARK-15474](https://issues.apache.org/jira/browse/SPARK-15474): ORC data source fails to write and read back empty dataframe
- [SPARK-21791](https://issues.apache.org/jira/browse/SPARK-21791): ORC should support column names with dot

## How was this patch tested?

Pass the Jenkins with the existing all tests and new tests for SPARK-15474 and SPARK-21791.

Author: Dongjoon Hyun <[email protected]>
Author: Wenchen Fan <[email protected]>

Closes apache#19651 from dongjoon-hyun/SPARK-20682.
  • Loading branch information
dongjoon-hyun authored and cloud-fan committed Dec 3, 2017
commit f23dddf105aef88531b3572ad70889cf2fc300c9
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.orc

import org.apache.hadoop.io._
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
* A deserializer to deserialize ORC structs to Spark rows.
*/
class OrcDeserializer(
dataSchema: StructType,
requiredSchema: StructType,
requestedColIds: Array[Int]) {

private val resultRow = new SpecificInternalRow(requiredSchema.map(_.dataType))

private val fieldWriters: Array[WritableComparable[_] => Unit] = {
requiredSchema.zipWithIndex
// The value of missing columns are always null, do not need writers.
.filterNot { case (_, index) => requestedColIds(index) == -1 }
.map { case (f, index) =>
val writer = newWriter(f.dataType, new RowUpdater(resultRow))
(value: WritableComparable[_]) => writer(index, value)
}.toArray
}

private val validColIds = requestedColIds.filterNot(_ == -1)

def deserialize(orcStruct: OrcStruct): InternalRow = {
var i = 0
while (i < validColIds.length) {
val value = orcStruct.getFieldValue(validColIds(i))
if (value == null) {
resultRow.setNullAt(i)
} else {
fieldWriters(i)(value)
}
i += 1
}
resultRow
}

/**
* Creates a writer to write ORC values to Catalyst data structure at the given ordinal.
*/
private def newWriter(
dataType: DataType, updater: CatalystDataUpdater): (Int, WritableComparable[_]) => Unit =
dataType match {
case NullType => (ordinal, _) =>
updater.setNullAt(ordinal)

case BooleanType => (ordinal, value) =>
updater.setBoolean(ordinal, value.asInstanceOf[BooleanWritable].get)

case ByteType => (ordinal, value) =>
updater.setByte(ordinal, value.asInstanceOf[ByteWritable].get)

case ShortType => (ordinal, value) =>
updater.setShort(ordinal, value.asInstanceOf[ShortWritable].get)

case IntegerType => (ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[IntWritable].get)

case LongType => (ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[LongWritable].get)

case FloatType => (ordinal, value) =>
updater.setFloat(ordinal, value.asInstanceOf[FloatWritable].get)

case DoubleType => (ordinal, value) =>
updater.setDouble(ordinal, value.asInstanceOf[DoubleWritable].get)

case StringType => (ordinal, value) =>
updater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Text].copyBytes))

case BinaryType => (ordinal, value) =>
val binary = value.asInstanceOf[BytesWritable]
val bytes = new Array[Byte](binary.getLength)
System.arraycopy(binary.getBytes, 0, bytes, 0, binary.getLength)
updater.set(ordinal, bytes)

case DateType => (ordinal, value) =>
updater.setInt(ordinal, DateTimeUtils.fromJavaDate(value.asInstanceOf[DateWritable].get))

case TimestampType => (ordinal, value) =>
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))

case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
val v = Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
v.changePrecision(precision, scale)
updater.set(ordinal, v)

case st: StructType => (ordinal, value) =>
val result = new SpecificInternalRow(st)
val fieldUpdater = new RowUpdater(result)
val fieldConverters = st.map(_.dataType).map { dt =>
newWriter(dt, fieldUpdater)
}.toArray
val orcStruct = value.asInstanceOf[OrcStruct]

var i = 0
while (i < st.length) {
val value = orcStruct.getFieldValue(i)
if (value == null) {
result.setNullAt(i)
} else {
fieldConverters(i)(i, value)
}
i += 1
}

updater.set(ordinal, result)

case ArrayType(elementType, _) => (ordinal, value) =>
val orcArray = value.asInstanceOf[OrcList[WritableComparable[_]]]
val length = orcArray.size()
val result = createArrayData(elementType, length)
val elementUpdater = new ArrayDataUpdater(result)
val elementConverter = newWriter(elementType, elementUpdater)

var i = 0
while (i < length) {
val value = orcArray.get(i)
if (value == null) {
result.setNullAt(i)
} else {
elementConverter(i, value)
}
i += 1
}

updater.set(ordinal, result)

case MapType(keyType, valueType, _) => (ordinal, value) =>
val orcMap = value.asInstanceOf[OrcMap[WritableComparable[_], WritableComparable[_]]]
val length = orcMap.size()
val keyArray = createArrayData(keyType, length)
val keyUpdater = new ArrayDataUpdater(keyArray)
val keyConverter = newWriter(keyType, keyUpdater)
val valueArray = createArrayData(valueType, length)
val valueUpdater = new ArrayDataUpdater(valueArray)
val valueConverter = newWriter(valueType, valueUpdater)

var i = 0
val it = orcMap.entrySet().iterator()
while (it.hasNext) {
val entry = it.next()
keyConverter(i, entry.getKey)
val value = entry.getValue
if (value == null) {
valueArray.setNullAt(i)
} else {
valueConverter(i, value)
}
i += 1
}

updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))

case udt: UserDefinedType[_] => newWriter(udt.sqlType, updater)

case _ =>
throw new UnsupportedOperationException(s"$dataType is not supported yet.")
}

private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {
case BooleanType => UnsafeArrayData.fromPrimitiveArray(new Array[Boolean](length))
case ByteType => UnsafeArrayData.fromPrimitiveArray(new Array[Byte](length))
case ShortType => UnsafeArrayData.fromPrimitiveArray(new Array[Short](length))
case IntegerType => UnsafeArrayData.fromPrimitiveArray(new Array[Int](length))
case LongType => UnsafeArrayData.fromPrimitiveArray(new Array[Long](length))
case FloatType => UnsafeArrayData.fromPrimitiveArray(new Array[Float](length))
case DoubleType => UnsafeArrayData.fromPrimitiveArray(new Array[Double](length))
case _ => new GenericArrayData(new Array[Any](length))
}

/**
* A base interface for updating values inside catalyst data structure like `InternalRow` and
* `ArrayData`.
*/
sealed trait CatalystDataUpdater {
def set(ordinal: Int, value: Any): Unit

def setNullAt(ordinal: Int): Unit = set(ordinal, null)
def setBoolean(ordinal: Int, value: Boolean): Unit = set(ordinal, value)
def setByte(ordinal: Int, value: Byte): Unit = set(ordinal, value)
def setShort(ordinal: Int, value: Short): Unit = set(ordinal, value)
def setInt(ordinal: Int, value: Int): Unit = set(ordinal, value)
def setLong(ordinal: Int, value: Long): Unit = set(ordinal, value)
def setDouble(ordinal: Int, value: Double): Unit = set(ordinal, value)
def setFloat(ordinal: Int, value: Float): Unit = set(ordinal, value)
}

final class RowUpdater(row: InternalRow) extends CatalystDataUpdater {
override def setNullAt(ordinal: Int): Unit = row.setNullAt(ordinal)
override def set(ordinal: Int, value: Any): Unit = row.update(ordinal, value)

override def setBoolean(ordinal: Int, value: Boolean): Unit = row.setBoolean(ordinal, value)
override def setByte(ordinal: Int, value: Byte): Unit = row.setByte(ordinal, value)
override def setShort(ordinal: Int, value: Short): Unit = row.setShort(ordinal, value)
override def setInt(ordinal: Int, value: Int): Unit = row.setInt(ordinal, value)
override def setLong(ordinal: Int, value: Long): Unit = row.setLong(ordinal, value)
override def setDouble(ordinal: Int, value: Double): Unit = row.setDouble(ordinal, value)
override def setFloat(ordinal: Int, value: Float): Unit = row.setFloat(ordinal, value)
}

final class ArrayDataUpdater(array: ArrayData) extends CatalystDataUpdater {
override def setNullAt(ordinal: Int): Unit = array.setNullAt(ordinal)
override def set(ordinal: Int, value: Any): Unit = array.update(ordinal, value)

override def setBoolean(ordinal: Int, value: Boolean): Unit = array.setBoolean(ordinal, value)
override def setByte(ordinal: Int, value: Byte): Unit = array.setByte(ordinal, value)
override def setShort(ordinal: Int, value: Short): Unit = array.setShort(ordinal, value)
override def setInt(ordinal: Int, value: Int): Unit = array.setInt(ordinal, value)
override def setLong(ordinal: Int, value: Long): Unit = array.setLong(ordinal, value)
override def setDouble(ordinal: Int, value: Double): Unit = array.setDouble(ordinal, value)
override def setFloat(ordinal: Int, value: Float): Unit = array.setFloat(ordinal, value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,29 @@

package org.apache.spark.sql.execution.datasources.orc

import org.apache.orc.TypeDescription
import java.io._
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc._
import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA}
import org.apache.orc.mapred.OrcStruct
import org.apache.orc.mapreduce._

import org.apache.spark.TaskContext
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

private[sql] object OrcFileFormat {
private def checkFieldName(name: String): Unit = {
Expand All @@ -39,3 +58,119 @@ private[sql] object OrcFileFormat {
names.foreach(checkFieldName)
}
}

/**
* New ORC File Format based on Apache ORC.
*/
class OrcFileFormat
extends FileFormat
with DataSourceRegister
with Serializable {

override def shortName(): String = "orc"

override def toString: String = "ORC"

override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[OrcFileFormat]

override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
OrcUtils.readSchema(sparkSession, files)
}

override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)

val conf = job.getConfiguration

conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, dataSchema.catalogString)

conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)

conf.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])

new OutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new OrcOutputWriter(path, dataSchema, context)
}

override def getFileExtension(context: TaskAttemptContext): String = {
val compressionExtension: String = {
val name = context.getConfiguration.get(COMPRESS.getAttribute)
OrcUtils.extensionsForCompressionCodecNames.getOrElse(name, "")
}

compressionExtension + ".orc"
}
}
}

override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
true
}

override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
if (sparkSession.sessionState.conf.orcFilterPushDown) {
OrcFilters.createFilter(dataSchema, filters).foreach { f =>
OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames)
}
}

val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis

(file: PartitionedFile) => {
val conf = broadcastedConf.value.value

val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, requiredSchema, new Path(new URI(file.filePath)), conf)

if (requestedColIdsOrEmptyFile.isEmpty) {
Iterator.empty
} else {
val requestedColIds = requestedColIdsOrEmptyFile.get
assert(requestedColIds.length == requiredSchema.length,
"[BUG] requested column IDs do not match required schema")
conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute,
requestedColIds.filter(_ != -1).sorted.mkString(","))

val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)

val orcRecordReader = new OrcInputFormat[OrcStruct]
.createRecordReader(fileSplit, taskAttemptContext)
val iter = new RecordReaderIterator[OrcStruct](orcRecordReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))

val unsafeProjection = UnsafeProjection.create(requiredSchema)
val deserializer = new OrcDeserializer(dataSchema, requiredSchema, requestedColIds)
iter.map(value => unsafeProjection(deserializer.deserialize(value)))
}
}
}
}
Loading