Skip to content

Commit 95daff6

Browse files
committed
[SPARK-11646] WholeTextFileRDD should return Text rather than String
If it returns Text, we can reuse this in Spark SQL to provide a WholeTextFile data source and directly convert the Text into UTF8String without extra string decoding and encoding. Author: Reynold Xin <[email protected]> Closes #9622 from rxin/SPARK-11646.
1 parent 27524a3 commit 95daff6

File tree

5 files changed

+69
-44
lines changed

5 files changed

+69
-44
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -863,10 +863,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
863863
new WholeTextFileRDD(
864864
this,
865865
classOf[WholeTextFileInputFormat],
866-
classOf[String],
867-
classOf[String],
866+
classOf[Text],
867+
classOf[Text],
868868
updateConf,
869-
minPartitions).setName(path)
869+
minPartitions).setName(path).map(record => (record._1.toString, record._2.toString))
870870
}
871871

872872
/**

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.input
2020
import scala.collection.JavaConverters._
2121

2222
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.io.Text
2324
import org.apache.hadoop.mapreduce.InputSplit
2425
import org.apache.hadoop.mapreduce.JobContext
2526
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
@@ -33,14 +34,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
3334
*/
3435

3536
private[spark] class WholeTextFileInputFormat
36-
extends CombineFileInputFormat[String, String] with Configurable {
37+
extends CombineFileInputFormat[Text, Text] with Configurable {
3738

3839
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
3940

4041
override def createRecordReader(
4142
split: InputSplit,
42-
context: TaskAttemptContext): RecordReader[String, String] = {
43-
43+
context: TaskAttemptContext): RecordReader[Text, Text] = {
4444
val reader =
4545
new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
4646
reader.setConf(getConf)

core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] class WholeTextFileRecordReader(
4949
split: CombineFileSplit,
5050
context: TaskAttemptContext,
5151
index: Integer)
52-
extends RecordReader[String, String] with Configurable {
52+
extends RecordReader[Text, Text] with Configurable {
5353

5454
private[this] val path = split.getPath(index)
5555
private[this] val fs = path.getFileSystem(
@@ -58,18 +58,18 @@ private[spark] class WholeTextFileRecordReader(
5858
// True means the current file has been processed, then skip it.
5959
private[this] var processed = false
6060

61-
private[this] val key = path.toString
62-
private[this] var value: String = null
61+
private[this] val key: Text = new Text(path.toString)
62+
private[this] var value: Text = null
6363

6464
override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {}
6565

6666
override def close(): Unit = {}
6767

6868
override def getProgress: Float = if (processed) 1.0f else 0.0f
6969

70-
override def getCurrentKey: String = key
70+
override def getCurrentKey: Text = key
7171

72-
override def getCurrentValue: String = value
72+
override def getCurrentValue: Text = value
7373

7474
override def nextKeyValue(): Boolean = {
7575
if (!processed) {
@@ -83,7 +83,7 @@ private[spark] class WholeTextFileRecordReader(
8383
ByteStreams.toByteArray(fileIn)
8484
}
8585

86-
value = new Text(innerBuffer).toString
86+
value = new Text(innerBuffer)
8787
Closeables.close(fileIn, false)
8888
processed = true
8989
true

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,11 @@ import org.apache.hadoop.mapreduce._
2828
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
2929

3030
import org.apache.spark.annotation.DeveloperApi
31-
import org.apache.spark.input.WholeTextFileInputFormat
3231
import org.apache.spark._
3332
import org.apache.spark.executor.DataReadMethod
3433
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
3534
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
36-
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils}
35+
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
3736
import org.apache.spark.deploy.SparkHadoopUtil
3837
import org.apache.spark.storage.StorageLevel
3938

@@ -59,7 +58,6 @@ private[spark] class NewHadoopPartition(
5958
* @param inputFormatClass Storage format of the data to be read.
6059
* @param keyClass Class of the key associated with the inputFormatClass.
6160
* @param valueClass Class of the value associated with the inputFormatClass.
62-
* @param conf The Hadoop configuration.
6361
*/
6462
@DeveloperApi
6563
class NewHadoopRDD[K, V](
@@ -282,32 +280,3 @@ private[spark] object NewHadoopRDD {
282280
}
283281
}
284282
}
285-
286-
private[spark] class WholeTextFileRDD(
287-
sc : SparkContext,
288-
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
289-
keyClass: Class[String],
290-
valueClass: Class[String],
291-
conf: Configuration,
292-
minPartitions: Int)
293-
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
294-
295-
override def getPartitions: Array[Partition] = {
296-
val inputFormat = inputFormatClass.newInstance
297-
val conf = getConf
298-
inputFormat match {
299-
case configurable: Configurable =>
300-
configurable.setConf(conf)
301-
case _ =>
302-
}
303-
val jobContext = newJobContext(conf, jobId)
304-
inputFormat.setMinPartitions(jobContext, minPartitions)
305-
val rawSplits = inputFormat.getSplits(jobContext).toArray
306-
val result = new Array[Partition](rawSplits.size)
307-
for (i <- 0 until rawSplits.size) {
308-
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
309-
}
310-
result
311-
}
312-
}
313-
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd
19+
20+
import org.apache.hadoop.conf.{Configurable, Configuration}
21+
import org.apache.hadoop.io.{Text, Writable}
22+
import org.apache.hadoop.mapreduce.InputSplit
23+
24+
import org.apache.spark.{Partition, SparkContext}
25+
import org.apache.spark.input.WholeTextFileInputFormat
26+
27+
/**
28+
* An RDD that reads a bunch of text files in, and each text file becomes one record.
29+
*/
30+
private[spark] class WholeTextFileRDD(
31+
sc : SparkContext,
32+
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
33+
keyClass: Class[Text],
34+
valueClass: Class[Text],
35+
conf: Configuration,
36+
minPartitions: Int)
37+
extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) {
38+
39+
override def getPartitions: Array[Partition] = {
40+
val inputFormat = inputFormatClass.newInstance
41+
val conf = getConf
42+
inputFormat match {
43+
case configurable: Configurable =>
44+
configurable.setConf(conf)
45+
case _ =>
46+
}
47+
val jobContext = newJobContext(conf, jobId)
48+
inputFormat.setMinPartitions(jobContext, minPartitions)
49+
val rawSplits = inputFormat.getSplits(jobContext).toArray
50+
val result = new Array[Partition](rawSplits.size)
51+
for (i <- 0 until rawSplits.size) {
52+
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
53+
}
54+
result
55+
}
56+
}

0 commit comments

Comments
 (0)