Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
00bcf8a
Avoid IO operations on empty files in BlockObjectWriter.
JoshRosen Apr 21, 2015
8fd89b4
Do not create empty files at all.
JoshRosen Apr 21, 2015
0db87c3
Reduce scope of FileOutputStream in ExternalSorter
JoshRosen Apr 21, 2015
7e2340d
Revert "Reduce scope of FileOutputStream in ExternalSorter"
JoshRosen Apr 22, 2015
8113eac
Hacky WIP towards speculatively running w/o reset(), then retrying wi…
JoshRosen Jun 5, 2015
417f50e
Hackily comment out most of dev/run-tests to speed up Jenkins iteration.
JoshRosen Jun 5, 2015
f90dc94
Don't pass configuration to ObjectWritable in SerializableWritable
JoshRosen Jun 5, 2015
480d20a
Broadcast configuration in hiveWriterContainers (WIP hack)
JoshRosen Jun 5, 2015
55041d2
Use local[*] instead of local[2]
JoshRosen Jun 5, 2015
57c2cb4
try in-memory Derby
JoshRosen Jun 5, 2015
9db0abc
Avoid writing empty files in BypassMergeSortShuffleWriter
JoshRosen Jun 5, 2015
9e116d1
Rework SPARK-7041 for BypassMergeSort split
JoshRosen Jun 5, 2015
3fe16e8
Revert "Broadcast configuration in hiveWriterContainers (WIP hack)"
JoshRosen Jun 5, 2015
54cd5ce
Merge remote-tracking branch 'origin/master' into file-handle-optimiz…
JoshRosen Jun 5, 2015
5c777cf
Rework SPARK-7041 for BypassMergeSort split
JoshRosen Jun 5, 2015
5ac11d1
Revert "Use local[*] instead of local[2]"
JoshRosen Jun 5, 2015
895de59
Revert "try in-memory Derby"
JoshRosen Jun 5, 2015
bf30fee
Revert "Don't pass configuration to ObjectWritable in SerializableWri…
JoshRosen Jun 5, 2015
b1e3f82
Revert "Hacky WIP towards speculatively running w/o reset(), then ret…
JoshRosen Jun 5, 2015
5113370
Revert "Rework SPARK-7041 for BypassMergeSort split"
JoshRosen Jun 5, 2015
fac08d5
SPARK-8135. In SerializableWritable, don't load defaults when instant…
sryza Jun 5, 2015
c7caa5c
Merge remote-tracking branch 'origin/master' into file-handle-optimiz…
JoshRosen Jun 6, 2015
8c1e1ff
Merge branch 'file-handle-optimizations' into hive-compat-suite-speedup
JoshRosen Jun 8, 2015
2b500b9
Only run unsafe tests (testing a jenkins job)
JoshRosen Jun 12, 2015
fbd3d03
Add log4j test properties to unsafe project.
JoshRosen Jun 12, 2015
46dd005
Try only testing bagel instead (since I don't think Maven logs to uni…
JoshRosen Jun 12, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "Broadcast configuration in hiveWriterContainers (WIP hack)"
This reverts commit 480d20a.
  • Loading branch information
JoshRosen committed Jun 5, 2015
commit 3fe16e89cb527fe63bff3795c4f08ba2f98ad127
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,12 @@ case class InsertIntoHiveTable(

val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableWritable(jobConf)
val broadcastedConf = sc.sparkContext.broadcast(new SerializableWritable[JobConf](jobConf))

val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(
broadcastedConf, fileSinkConf, dynamicPartColNames)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
} else {
new SparkHiveWriterContainer(broadcastedConf, fileSinkConf)
new SparkHiveWriterContainer(jobConf, fileSinkConf)
}

saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
import org.apache.hadoop.hive.common.FileUtils

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.Row
import org.apache.spark.{SerializableWritable, Logging, SparkHadoopWriter}
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.HiveShim._
Expand All @@ -45,7 +44,7 @@ import org.apache.spark.sql.types._
* It is based on [[SparkHadoopWriter]].
*/
private[hive] class SparkHiveWriterContainer(
jobConf: Broadcast[SerializableWritable[JobConf]],
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc)
extends Logging
with SparkHadoopMapRedUtil
Expand All @@ -57,28 +56,22 @@ private[hive] class SparkHiveWriterContainer(
// handler settings can be set to jobConf
if (tableDesc != null) {
PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc)
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf.value.value)
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
}
@transient var conf: JobConf = jobConf.value.value
protected val conf = new SerializableWritable(jobConf)

private var jobID = 0
private var splitID = 0
private var attemptID = 0

@transient private var jID: JobID = null
@transient private var taID: TaskAttemptID = null
private var jIDString: String = null
private var taskIDString: String = null
private var taskAttemptIDString: String = null
private var jID: SerializableWritable[JobID] = null
private var taID: SerializableWritable[TaskAttemptID] = null

@transient private var writer: FileSinkOperator.RecordWriter = null
@transient protected lazy val committer = conf.getOutputCommitter
/** Only used on driver side **/
@transient protected lazy val jobContext = newJobContext(conf, jID)
/** Only used on executor side */
@transient private lazy val taskContext = newTaskAttemptContext(conf, taID)
@transient protected lazy val committer = conf.value.getOutputCommitter
@transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
@transient private lazy val outputFormat =
conf.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]

def driverSideSetup() {
setIDs(0, 0, 0)
Expand All @@ -87,7 +80,6 @@ private[hive] class SparkHiveWriterContainer(
}

def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
conf = new JobConf(jobConf.value.value)
setIDs(jobId, splitId, attemptId)
setConfParams()
committer.setupTask(taskContext)
Expand All @@ -98,7 +90,7 @@ private[hive] class SparkHiveWriterContainer(
val numberFormat = NumberFormat.getInstance()
numberFormat.setMinimumIntegerDigits(5)
numberFormat.setGroupingUsed(false)
val extension = Utilities.getFileExtension(conf, fileSinkConf.getCompressed, outputFormat)
val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
"part-" + numberFormat.format(splitID) + extension
}

Expand All @@ -118,11 +110,11 @@ private[hive] class SparkHiveWriterContainer(
// NOTE this method is executed at the executor side.
// For Hive tables without partitions or with only static partitions, only 1 writer is needed.
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf,
conf.value,
fileSinkConf.getTableInfo,
conf.getOutputValueClass.asInstanceOf[Class[Writable]],
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
FileOutputFormat.getTaskOutputPath(conf, getOutputName),
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
Reporter.NULL)
}

Expand All @@ -135,23 +127,17 @@ private[hive] class SparkHiveWriterContainer(
splitID = splitId
attemptID = attemptId

// note: sparkHadoopwriter.createjobid may be locale-dependent because it doesn't pass a locale
// to date format; we should fix this so that its results is location-independent in case
// different cluster nodes have different locales (e.g. driver and executor may be different
// types of machines with different configurations).
jID = SparkHadoopWriter.createJobID(now, jobId)
taID = new TaskAttemptID(new TaskID(jID, true, splitID), attemptID)
jIDString = jID.toString
taskAttemptIDString = taID.toString
taskIDString = taID.getTaskID.toString
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}

private def setConfParams() {
conf.set("mapred.job.id", jIDString)
conf.set("mapred.tip.id", taskIDString)
conf.set("mapred.task.id", taskAttemptIDString)
conf.setBoolean("mapred.task.is.map", true)
conf.setInt("mapred.task.partition", splitID)
conf.value.set("mapred.job.id", jID.value.toString)
conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
conf.value.set("mapred.task.id", taID.value.toString)
conf.value.setBoolean("mapred.task.is.map", true)
conf.value.setInt("mapred.task.partition", splitID)
}
}

Expand All @@ -174,14 +160,14 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer {
}

private[spark] class SparkHiveDynamicPartitionWriterContainer(
jobConf: Broadcast[SerializableWritable[JobConf]],
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {

import SparkHiveDynamicPartitionWriterContainer._

private val defaultPartName = jobConf.value.value.get(
private val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)

@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
Expand All @@ -205,10 +191,10 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
// Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
// calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
// load it with loadDynamicPartitions/loadPartition/loadTable.
val oldMarker = jobConf.value.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
super.commitJob()
jobConf.value.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
}

override def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = {
Expand Down Expand Up @@ -243,16 +229,16 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)

val path = {
val outputPath = FileOutputFormat.getOutputPath(conf)
val outputPath = FileOutputFormat.getOutputPath(conf.value)
assert(outputPath != null, "Undefined job output-path")
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
new Path(workPath, getOutputName)
}

HiveFileFormatUtils.getHiveRecordWriter(
conf,
conf.value,
fileSinkConf.getTableInfo,
conf.getOutputValueClass.asInstanceOf[Class[Writable]],
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
newFileSinkDesc,
path,
Reporter.NULL)
Expand Down