From 5fd215f7b6012c818f39402fff6cae8d19338b90 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Jan 2017 06:23:32 +0000 Subject: [PATCH 1/5] Fix InputFileBlock for HadoopRDD. --- .../org/apache/spark/rdd/HadoopRDD.scala | 24 ++++++++++++------- .../org/apache/spark/rdd/NewHadoopRDD.scala | 24 ++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a83e139c13ee..abc447d28e7f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -217,14 +217,6 @@ class HadoopRDD[K, V]( private val inputMetrics = context.taskMetrics().inputMetrics private val existingBytesRead = inputMetrics.bytesRead - // Sets InputFileBlockHolder for the file block's information - split.inputSplit.value match { - case fs: FileSplit => - InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength) - case _ => - InputFileBlockHolder.unset() - } - // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match { @@ -263,7 +255,23 @@ class HadoopRDD[K, V]( private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey() private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue() + private var setInputFileBlockHolder: Boolean = false + override def getNext(): (K, V) = { + if (!setInputFileBlockHolder) { + // Sets InputFileBlockHolder for the file block's information + // We can't set it before consuming this iterator, otherwise some expressions which + // use thread local variables will fail when working with Python UDF. That is because + // the batch of Python UDF is running in individual thread. + split.inputSplit.value match { + case fs: FileSplit => + InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength) + case _ => + InputFileBlockHolder.unset() + } + setInputFileBlockHolder = true + } + try { finished = !reader.next(key, value) } catch { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 733e85f305f0..aa65d619d6b1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -139,14 +139,6 @@ class NewHadoopRDD[K, V]( private val inputMetrics = context.taskMetrics().inputMetrics private val existingBytesRead = inputMetrics.bytesRead - // Sets InputFileBlockHolder for the file block's information - split.serializableHadoopSplit.value match { - case fs: FileSplit => - InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength) - case _ => - InputFileBlockHolder.unset() - } - // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes private val getBytesReadCallback: Option[() => Long] = @@ -217,7 +209,23 @@ class NewHadoopRDD[K, V]( !finished } + private var setInputFileBlockHolder: Boolean = false + override def next(): (K, V) = { + if (!setInputFileBlockHolder) { + // Sets InputFileBlockHolder for the file block's information. + // We can't set it before consuming this iterator, otherwise some expressions which + // use thread local variables will fail when working with Python UDF. That is because + // the batch of Python UDF is running in individual thread. + split.serializableHadoopSplit.value match { + case fs: FileSplit => + InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength) + case _ => + InputFileBlockHolder.unset() + } + setInputFileBlockHolder = true + } + if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } From 1563e03796a1ee557decfa041d39dbd5eee8cf33 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 17 Jan 2017 05:31:47 +0000 Subject: [PATCH 2/5] Address comment. --- .../org/apache/spark/rdd/HadoopRDD.scala | 24 +++++++------------ .../spark/rdd/InputFileBlockHolder.scala | 7 +++--- .../org/apache/spark/rdd/NewHadoopRDD.scala | 24 +++++++------------ 3 files changed, 20 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index abc447d28e7f..a83e139c13ee 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -217,6 +217,14 @@ class HadoopRDD[K, V]( private val inputMetrics = context.taskMetrics().inputMetrics private val existingBytesRead = inputMetrics.bytesRead + // Sets InputFileBlockHolder for the file block's information + split.inputSplit.value match { + case fs: FileSplit => + InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength) + case _ => + InputFileBlockHolder.unset() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match { @@ -255,23 +263,7 @@ class HadoopRDD[K, V]( private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey() private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue() - private var setInputFileBlockHolder: Boolean = false - override def getNext(): (K, V) = { - if (!setInputFileBlockHolder) { - // Sets InputFileBlockHolder for the file block's information - // We can't set it before consuming this iterator, otherwise some expressions which - // use thread local variables will fail when working with Python UDF. That is because - // the batch of Python UDF is running in individual thread. - split.inputSplit.value match { - case fs: FileSplit => - InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength) - case _ => - InputFileBlockHolder.unset() - } - setInputFileBlockHolder = true - } - try { finished = !reader.next(key, value) } catch { diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala index 9ba476d2ba26..ff2f58d81142 100644 --- a/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala +++ b/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala @@ -41,9 +41,10 @@ private[spark] object InputFileBlockHolder { * The thread variable for the name of the current file being read. This is used by * the InputFileName function in Spark SQL. */ - private[this] val inputBlock: ThreadLocal[FileBlock] = new ThreadLocal[FileBlock] { - override protected def initialValue(): FileBlock = new FileBlock - } + private[this] val inputBlock: InheritableThreadLocal[FileBlock] = + new InheritableThreadLocal[FileBlock] { + override protected def initialValue(): FileBlock = new FileBlock + } /** * Returns the holding file name or empty string if it is unknown. diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index aa65d619d6b1..733e85f305f0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -139,6 +139,14 @@ class NewHadoopRDD[K, V]( private val inputMetrics = context.taskMetrics().inputMetrics private val existingBytesRead = inputMetrics.bytesRead + // Sets InputFileBlockHolder for the file block's information + split.serializableHadoopSplit.value match { + case fs: FileSplit => + InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength) + case _ => + InputFileBlockHolder.unset() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes private val getBytesReadCallback: Option[() => Long] = @@ -209,23 +217,7 @@ class NewHadoopRDD[K, V]( !finished } - private var setInputFileBlockHolder: Boolean = false - override def next(): (K, V) = { - if (!setInputFileBlockHolder) { - // Sets InputFileBlockHolder for the file block's information. - // We can't set it before consuming this iterator, otherwise some expressions which - // use thread local variables will fail when working with Python UDF. That is because - // the batch of Python UDF is running in individual thread. - split.serializableHadoopSplit.value match { - case fs: FileSplit => - InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength) - case _ => - InputFileBlockHolder.unset() - } - setInputFileBlockHolder = true - } - if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } From 8380617c6412ea771c1078041066afefb0e7eae2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 17 Jan 2017 14:03:27 +0000 Subject: [PATCH 3/5] Add test case. --- .../spark/rdd/InputFileBlockHolderSuite.scala | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/rdd/InputFileBlockHolderSuite.scala diff --git a/core/src/test/scala/org/apache/spark/rdd/InputFileBlockHolderSuite.scala b/core/src/test/scala/org/apache/spark/rdd/InputFileBlockHolderSuite.scala new file mode 100644 index 000000000000..b12752006f0d --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/InputFileBlockHolderSuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io._ +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} + +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.util.Utils + +class InputFileBlockHolderSuite extends SparkFunSuite { + test("InputFileBlockHolder should work with both HadoopRDD and NewHadoopRDD when in new thread") { + val sc = new SparkContext("local", "test") + val tempDir: File = Utils.createTempDir() + val outDir = new File(tempDir, "output").getAbsolutePath + sc.makeRDD(1 to 4, 1).saveAsTextFile(outDir) + + // Test HadoopRDD + val filename1 = sc.textFile(outDir).mapPartitions { iter => + // Consume the iterator so InputFileBlockHolder will be set. + iter.next() + val output = new ArrayBuffer[String]() + val thread = new TestThread(output) + thread.start() + thread.join() + output.toIterator + }.collect().head + + assert(filename1.endsWith("part-00000")) + + // Test NewHadoopRDD + val filename2 = + sc.newAPIHadoopFile[LongWritable, Text, NewTextInputFormat](outDir).mapPartitions { iter => + // Consume the iterator so InputFileBlockHolder will be set. + iter.next() + val output = new ArrayBuffer[String]() + val thread = new TestThread(output) + thread.start() + thread.join() + output.toIterator + }.collect().head + + assert(filename2.endsWith("part-00000")) + + Utils.deleteRecursively(tempDir) + sc.stop() + } +} + +private class TestThread(output: ArrayBuffer[String]) + extends Thread("test thread for InputFileBlockHolder") { + override def run(): Unit = { + output += InputFileBlockHolder.getInputFilePath.toString + } +} From 2ce65cb8336b32d8309f189e2c63a576c5a60ee5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 18 Jan 2017 02:36:23 +0000 Subject: [PATCH 4/5] Replace test case with a pyspark one. --- .../spark/rdd/InputFileBlockHolderSuite.scala | 75 ------------------- python/pyspark/sql/tests.py | 25 +++++++ 2 files changed, 25 insertions(+), 75 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/rdd/InputFileBlockHolderSuite.scala diff --git a/core/src/test/scala/org/apache/spark/rdd/InputFileBlockHolderSuite.scala b/core/src/test/scala/org/apache/spark/rdd/InputFileBlockHolderSuite.scala deleted file mode 100644 index b12752006f0d..000000000000 --- a/core/src/test/scala/org/apache/spark/rdd/InputFileBlockHolderSuite.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import java.io.File - -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io._ -import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} - -import org.apache.spark.{SparkContext, SparkFunSuite} -import org.apache.spark.util.Utils - -class InputFileBlockHolderSuite extends SparkFunSuite { - test("InputFileBlockHolder should work with both HadoopRDD and NewHadoopRDD when in new thread") { - val sc = new SparkContext("local", "test") - val tempDir: File = Utils.createTempDir() - val outDir = new File(tempDir, "output").getAbsolutePath - sc.makeRDD(1 to 4, 1).saveAsTextFile(outDir) - - // Test HadoopRDD - val filename1 = sc.textFile(outDir).mapPartitions { iter => - // Consume the iterator so InputFileBlockHolder will be set. - iter.next() - val output = new ArrayBuffer[String]() - val thread = new TestThread(output) - thread.start() - thread.join() - output.toIterator - }.collect().head - - assert(filename1.endsWith("part-00000")) - - // Test NewHadoopRDD - val filename2 = - sc.newAPIHadoopFile[LongWritable, Text, NewTextInputFormat](outDir).mapPartitions { iter => - // Consume the iterator so InputFileBlockHolder will be set. - iter.next() - val output = new ArrayBuffer[String]() - val thread = new TestThread(output) - thread.start() - thread.join() - output.toIterator - }.collect().head - - assert(filename2.endsWith("part-00000")) - - Utils.deleteRecursively(tempDir) - sc.stop() - } -} - -private class TestThread(output: ArrayBuffer[String]) - extends Thread("test thread for InputFileBlockHolder") { - override def run(): Unit = { - output += InputFileBlockHolder.getInputFilePath.toString - } -} diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a8250281dab3..66958bdacb4e 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -435,6 +435,31 @@ def test_udf_with_input_file_name(self): row = self.spark.read.json(filePath).select(sourceFile(input_file_name())).first() self.assertTrue(row[0].find("people1.json") != -1) + def test_udf_with_input_file_name_for_hadooprdd(self): + from pyspark.sql.functions import udf, input_file_name + from pyspark.sql.types import StringType + + def filename(path): + return path + + self.spark.udf.register('sameText', filename) + sameText = udf(filename, StringType()) + + rdd = self.sc.textFile('python/test_support/sql/people.json') + df = self.spark.read.json(rdd).select(input_file_name().alias('file')) + row = df.select(sameText(df['file'])).first() + self.assertTrue(row[0].find("people.json") != -1) + + rdd2 = self.sc.newAPIHadoopFile( + 'python/test_support/sql/people.json', + 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', + 'org.apache.hadoop.io.LongWritable', + 'org.apache.hadoop.io.Text') + + df2 = self.spark.read.json(rdd2).select(input_file_name().alias('file')) + row = df2.select(sameText(df2['file'])).first() + self.assertTrue(row[0].find("people.json") != -1) + def test_basic_functions(self): rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) df = self.spark.read.json(rdd) From 2b61d472a74766d4a1c2af4cf2278a87b7b12698 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 18 Jan 2017 06:34:06 +0000 Subject: [PATCH 5/5] For comment. --- python/pyspark/sql/tests.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 66958bdacb4e..73a5df65e0ab 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -442,7 +442,6 @@ def test_udf_with_input_file_name_for_hadooprdd(self): def filename(path): return path - self.spark.udf.register('sameText', filename) sameText = udf(filename, StringType()) rdd = self.sc.textFile('python/test_support/sql/people.json') @@ -457,8 +456,8 @@ def filename(path): 'org.apache.hadoop.io.Text') df2 = self.spark.read.json(rdd2).select(input_file_name().alias('file')) - row = df2.select(sameText(df2['file'])).first() - self.assertTrue(row[0].find("people.json") != -1) + row2 = df2.select(sameText(df2['file'])).first() + self.assertTrue(row2[0].find("people.json") != -1) def test_basic_functions(self): rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])