From f945b641c70790a82c864ec752b673b89bb4310f Mon Sep 17 00:00:00 2001 From: robbins Date: Thu, 3 Sep 2015 13:48:35 -0700 Subject: [PATCH 01/15] [SPARK-9869] [STREAMING] Wait for all event notifications before asserting results Author: robbins Closes #8589 from robbinspg/InputStreamSuite-fix. (cherry picked from commit 754f853b02e9fd221f138c2446445fd56e3f3fb3) Signed-off-by: Andrew Or --- .../scala/org/apache/spark/streaming/InputStreamsSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index ec2852d9a020..047e38ef9099 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -76,6 +76,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { fail("Timeout: cannot finish all batches in 30 seconds") } + // Ensure progress listener has been notified of all events + ssc.scheduler.listenerBus.waitUntilEmpty(500) + // Verify all "InputInfo"s have been reported assert(ssc.progressListener.numTotalReceivedRecords === input.size) assert(ssc.progressListener.numTotalProcessedRecords === input.size) From 4d63335972eb7167c3c53db64a1e47fd24875287 Mon Sep 17 00:00:00 2001 From: robbins Date: Thu, 3 Sep 2015 13:47:22 -0700 Subject: [PATCH 02/15] [SPARK-10431] [CORE] Fix intermittent test failure. Wait for event queue to be clear Author: robbins Closes #8582 from robbinspg/InputOutputMetricsSuite. --- .../org/apache/spark/metrics/InputOutputMetricsSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index d3218a548efc..44eb5a046912 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -286,6 +286,10 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext private def runAndReturnMetrics(job: => Unit, collector: (SparkListenerTaskEnd) => Option[Long]): Long = { val taskMetrics = new ArrayBuffer[Long]() + + // Avoid receiving earlier taskEnd events + sc.listenerBus.waitUntilEmpty(500) + sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { collector(taskEnd).foreach(taskMetrics += _) From 09e08dbc15d470b546805d4c39ad78cb5f8e3837 Mon Sep 17 00:00:00 2001 From: robbins Date: Fri, 4 Sep 2015 15:23:29 -0700 Subject: [PATCH 03/15] [SPARK-10454] [SPARK CORE] wait for empty event queue Author: robbins Closes #8605 from robbinspg/DAGSchedulerSuite-fix. (cherry picked from commit 2e1c17553d179f2d26a165805622cc01f92081b9) Signed-off-by: Andrew Or --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9c947518941b..ed481b1374c4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -527,6 +527,7 @@ class DAGSchedulerSuite } // The map stage should have been submitted. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) complete(taskSets(0), Seq( From dc3965831adbb54892f7aca4f726e83fdfbec65d Mon Sep 17 00:00:00 2001 From: xutingjun Date: Fri, 4 Sep 2015 15:40:02 -0700 Subject: [PATCH 04/15] [SPARK-10311] [STREAMING] Reload appId and attemptId when app starts with checkpoint file in cluster mode Author: xutingjun Closes #8477 from XuTingjun/streaming-attempt. --- .../src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index cd5d960369c0..3985e1a3d9df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -49,6 +49,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) // Reload properties for the checkpoint application since user wants to set a reload property // or spark had changed its value and user wants to set it back. val propertiesToReload = List( + "spark.yarn.app.id", + "spark.yarn.app.attemptId", "spark.driver.host", "spark.driver.port", "spark.master", From cfc5f6f14930a3731d1c4c131233407f0ccabcb2 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 4 Sep 2015 17:32:35 -0700 Subject: [PATCH 05/15] [SPARK-10402] [DOCS] [ML] Add defaults to the scaladoc for params in ml/ We should make sure the scaladoc for params includes their default values through the models in ml/ Author: Holden Karau Closes #8591 from holdenk/SPARK-10402-add-scaladoc-for-default-values-of-params-in-ml. (cherry picked from commit 22eab706f4a1459100f9e97fc557a7f3c88ca10e) Signed-off-by: Joseph K. Bradley --- .../ml/classification/MultilayerPerceptronClassifier.scala | 2 ++ .../spark/ml/evaluation/BinaryClassificationEvaluator.scala | 1 + .../main/scala/org/apache/spark/ml/feature/Binarizer.scala | 1 + mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala | 1 + .../scala/org/apache/spark/ml/feature/StopWordsRemover.scala | 2 ++ .../scala/org/apache/spark/ml/feature/StringIndexer.scala | 4 ++-- .../scala/org/apache/spark/ml/feature/VectorIndexer.scala | 1 + .../main/scala/org/apache/spark/ml/feature/VectorSlicer.scala | 2 ++ .../src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala | 3 +++ .../org/apache/spark/ml/regression/IsotonicRegression.scala | 1 + 10 files changed, 16 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index 1e5b0bc4453e..82fc80c58054 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -32,6 +32,7 @@ private[ml] trait MultilayerPerceptronParams extends PredictorParams with HasSeed with HasMaxIter with HasTol { /** * Layer sizes including input size and output size. + * Default: Array(1, 1) * @group param */ final val layers: IntArrayParam = new IntArrayParam(this, "layers", @@ -50,6 +51,7 @@ private[ml] trait MultilayerPerceptronParams extends PredictorParams * Data is stacked within partitions. If block size is more than remaining data in * a partition then it is adjusted to the size of this data. * Recommended size is between 10 and 1000. + * Default: 128 * @group expertParam */ final val blockSize: IntParam = new IntParam(this, "blockSize", diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index 56419a0a1595..08df2919a8a8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -38,6 +38,7 @@ class BinaryClassificationEvaluator(override val uid: String) /** * param for metric name in evaluation + * Default: areaUnderROC * @group param */ val metricName: Param[String] = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala index 46314854d5e3..edad75443645 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala @@ -41,6 +41,7 @@ final class Binarizer(override val uid: String) * Param for threshold used to binarize continuous features. * The features greater than the threshold, will be binarized to 1.0. * The features equal to or less than the threshold, will be binarized to 0.0. + * Default: 0.0 * @group param */ val threshold: DoubleParam = diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 938447447a0a..4c36df75d8aa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -35,6 +35,7 @@ private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol /** * The minimum of documents in which a term should appear. + * Default: 0 * @group param */ final val minDocFreq = new IntParam( diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala index 5d77ea08db65..3a0b2faa2f82 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala @@ -98,6 +98,7 @@ class StopWordsRemover(override val uid: String) /** * the stop words set to be filtered out + * Default: [[StopWords.English]] * @group param */ val stopWords: StringArrayParam = new StringArrayParam(this, "stopWords", "stop words") @@ -110,6 +111,7 @@ class StopWordsRemover(override val uid: String) /** * whether to do a case sensitive comparison over the stop words + * Default: false * @group param */ val caseSensitive: BooleanParam = new BooleanParam(this, "caseSensitive", diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 76f017d7c9d0..8a74da58454e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -194,8 +194,8 @@ class IndexToString private[ml] ( /** * Param for array of labels. - * Optional labels to be provided by the user, if not supplied column - * metadata is read for labels. + * Optional labels to be provided by the user. + * Default: Empty array, in which case column metadata is used for labels. * @group param */ final val labels: StringArrayParam = new StringArrayParam(this, "labels", diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 6875aefe065b..fa9f8237e39b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -43,6 +43,7 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu * Must be >= 2. * * (default = 20) + * @group param */ val maxCategories = new IntParam(this, "maxCategories", "Threshold for the number of values a categorical feature can take (>= 2)." + diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala index 772bebeff214..9ec6b3e1fa8e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala @@ -49,6 +49,7 @@ final class VectorSlicer(override val uid: String) /** * An array of indices to select features from a vector column. * There can be no overlap with [[names]]. + * Default: Empty array * @group param */ val indices = new IntArrayParam(this, "indices", @@ -67,6 +68,7 @@ final class VectorSlicer(override val uid: String) * An array of feature names to select features from a vector column. * These names must be specified by ML [[org.apache.spark.ml.attribute.Attribute]]s. * There can be no overlap with [[indices]]. + * Default: Empty Array * @group param */ val names = new StringArrayParam(this, "names", diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 5af775a4159a..9edab3af913c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -39,6 +39,7 @@ private[feature] trait Word2VecBase extends Params /** * The dimension of the code that you want to transform from words. + * Default: 100 * @group param */ final val vectorSize = new IntParam( @@ -50,6 +51,7 @@ private[feature] trait Word2VecBase extends Params /** * Number of partitions for sentences of words. + * Default: 1 * @group param */ final val numPartitions = new IntParam( @@ -62,6 +64,7 @@ private[feature] trait Word2VecBase extends Params /** * The minimum number of times a token must appear to be included in the word2vec model's * vocabulary. + * Default: 5 * @group param */ final val minCount = new IntParam(this, "minCount", "the minimum number of times a token must " + diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index 0f33bae30e62..d43a3447d397 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -40,6 +40,7 @@ private[regression] trait IsotonicRegressionBase extends Params with HasFeatures /** * Param for whether the output sequence should be isotonic/increasing (true) or * antitonic/decreasing (false). + * Default: true * @group param */ final val isotonic: BooleanParam = From ec750a7c302b7b68429743c06739dad22437bec0 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 4 Sep 2015 23:16:39 -1000 Subject: [PATCH 06/15] [SPARK-10440] [STREAMING] [DOCS] Update python API stuff in the programming guides and python docs - Fixed information around Python API tags in streaming programming guides - Added missing stuff in python docs Author: Tathagata Das Closes #8595 from tdas/SPARK-10440. (cherry picked from commit 7a4f326c00fb33c384b4fb927310d687ec063329) Signed-off-by: Reynold Xin --- docs/streaming-flume-integration.md | 2 -- docs/streaming-programming-guide.md | 14 ++++---------- python/docs/index.rst | 8 ++++++++ python/docs/pyspark.streaming.rst | 21 +++++++++++++++++++++ 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md index de0461010dae..383d954409ce 100644 --- a/docs/streaming-flume-integration.md +++ b/docs/streaming-flume-integration.md @@ -5,8 +5,6 @@ title: Spark Streaming + Flume Integration Guide [Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this. -Python API Flume is not yet available in the Python API. - ## Approach 1: Flume-style Push-based Approach Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps. diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 118ced298f4b..a1acf83f7524 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -50,13 +50,7 @@ all of which are presented in this guide. You will find tabs throughout this guide that let you choose between code snippets of different languages. -**Note:** Python API for Spark Streaming has been introduced in Spark 1.2. It has all the DStream -transformations and almost all the output operations available in Scala and Java interfaces. -However, it only has support for basic sources like text files and text data over sockets. -APIs for additional sources, like Kafka and Flume, will be available in the future. -Further information about available features in the Python API are mentioned throughout this -document; look out for the tag -Python API. +**Note:** There are a few APIs that are either different or not available in Python. Throughout this guide, you will find the tag Python API highlighting these differences. *************************************************************************************************** @@ -683,7 +677,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea {:.no_toc} Python API As of Spark {{site.SPARK_VERSION_SHORT}}, -out of these sources, *only* Kafka, Flume and MQTT are available in the Python API. We will add more advanced sources in the Python API in future. +out of these sources, Kafka, Kinesis, Flume and MQTT are available in the Python API. This category of sources require interfacing with external non-Spark libraries, some of them with complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts @@ -725,9 +719,9 @@ Some of these advanced sources are as follows. - **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.2.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details. -- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details. +- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.6.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details. -- **Kinesis:** See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details. +- **Kinesis:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kinesis Client Library 1.2.1. See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details. - **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information diff --git a/python/docs/index.rst b/python/docs/index.rst index f7eede9c3c82..306ffdb0e0f1 100644 --- a/python/docs/index.rst +++ b/python/docs/index.rst @@ -29,6 +29,14 @@ Core classes: A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. + :class:`pyspark.streaming.StreamingContext` + + Main entry point for Spark Streaming functionality. + + :class:`pyspark.streaming.DStream` + + A Discretized Stream (DStream), the basic abstraction in Spark Streaming. + :class:`pyspark.sql.SQLContext` Main entry point for DataFrame and SQL functionality. diff --git a/python/docs/pyspark.streaming.rst b/python/docs/pyspark.streaming.rst index 50822c93faba..fc52a647543e 100644 --- a/python/docs/pyspark.streaming.rst +++ b/python/docs/pyspark.streaming.rst @@ -15,3 +15,24 @@ pyspark.streaming.kafka module :members: :undoc-members: :show-inheritance: + +pyspark.streaming.kinesis module +-------------------------------- +.. automodule:: pyspark.streaming.kinesis + :members: + :undoc-members: + :show-inheritance: + +pyspark.streaming.flume.module +------------------------------ +.. automodule:: pyspark.streaming.flume + :members: + :undoc-members: + :show-inheritance: + +pyspark.streaming.mqtt module +----------------------------- +.. automodule:: pyspark.streaming.mqtt + :members: + :undoc-members: + :show-inheritance: From 640000b372e7e81f14b9758e712384502af8f215 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 5 Sep 2015 17:50:12 +0800 Subject: [PATCH 07/15] [SPARK-10434] [SQL] Fixes Parquet schema of arrays that may contain null To keep full compatibility of Parquet write path with Spark 1.4, we should rename the innermost field name of arrays that may contain null from "array_element" to "array". Please refer to [SPARK-10434] [1] for more details. [1]: https://issues.apache.org/jira/browse/SPARK-10434 Author: Cheng Lian Closes #8586 from liancheng/spark-10434/fix-parquet-array-type. (cherry picked from commit bca8c072bd710beda6cfac1533a67f32f579b134) Signed-off-by: Cheng Lian --- .../parquet/CatalystSchemaConverter.scala | 13 +++++++------ .../datasources/parquet/ParquetSchemaSuite.scala | 6 +++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 535f0684e97f..71161f8bf3e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -432,13 +432,14 @@ private[parquet] class CatalystSchemaConverter( // ArrayType and MapType (for Spark versions <= 1.4.x) // =================================================== - // Spark 1.4.x and prior versions convert ArrayType with nullable elements into a 3-level - // LIST structure. This behavior mimics parquet-hive (1.6.0rc3). Note that this case is - // covered by the backwards-compatibility rules implemented in `isElementType()`. + // Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level + // `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro + // (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element + // field name "array" is borrowed from parquet-avro. case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec => // group (LIST) { // optional group bag { - // repeated element; + // repeated array; // } // } ConversionPatterns.listType( @@ -447,8 +448,8 @@ private[parquet] class CatalystSchemaConverter( Types .buildGroup(REPEATED) // "array_element" is the name chosen by parquet-hive (1.7.0 and prior version) - .addField(convertField(StructField("array_element", elementType, nullable))) - .named(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME)) + .addField(convertField(StructField("array", elementType, nullable))) + .named("bag")) // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 9dcbc1a047be..b344616a9b29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -196,7 +196,7 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { |message root { | optional group _1 (LIST) { | repeated group bag { - | optional int32 array_element; + | optional int32 array; | } | } |} @@ -265,7 +265,7 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | optional binary _1 (UTF8); | optional group _2 (LIST) { | repeated group bag { - | optional group array_element { + | optional group array { | required int32 _1; | required double _2; | } @@ -644,7 +644,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional group f1 (LIST) { | repeated group bag { - | optional int32 array_element; + | optional int32 array; | } | } |} From 37c5edf1c569d6dcb64e2c572ad2e0c0a6e6ddb2 Mon Sep 17 00:00:00 2001 From: Stephen Hopper Date: Tue, 8 Sep 2015 14:36:34 +0100 Subject: [PATCH 08/15] =?UTF-8?q?[DOC]=20Added=20R=20to=20the=20list=20of?= =?UTF-8?q?=20languages=20with=20"high-level=20API"=20support=20in=20the?= =?UTF-8?q?=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … main README. Author: Stephen Hopper Closes #8646 from enragedginger/master. (cherry picked from commit 9d8e838d883ed21f9ef562e7e3ac074c7e4adb88) Signed-off-by: Sean Owen --- README.md | 4 ++-- docs/quick-start.md | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 380422ca00db..76e29b423566 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Apache Spark Spark is a fast and general cluster computing system for Big Data. It provides -high-level APIs in Scala, Java, and Python, and an optimized engine that +high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, @@ -94,5 +94,5 @@ distribution. ## Configuration -Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html) +Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html) in the online documentation for an overview on how to configure Spark. diff --git a/docs/quick-start.md b/docs/quick-start.md index ce2cc9d2169c..d481fe0ea6d7 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -126,7 +126,7 @@ scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (w wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 {% endhighlight %} -Here, we combined the [`flatMap`](programming-guide.html#transformations), [`map`](programming-guide.html#transformations) and [`reduceByKey`](programming-guide.html#transformations) transformations to compute the per-word counts in the file as an RDD of (String, Int) pairs. To collect the word counts in our shell, we can use the [`collect`](programming-guide.html#actions) action: +Here, we combined the [`flatMap`](programming-guide.html#transformations), [`map`](programming-guide.html#transformations), and [`reduceByKey`](programming-guide.html#transformations) transformations to compute the per-word counts in the file as an RDD of (String, Int) pairs. To collect the word counts in our shell, we can use the [`collect`](programming-guide.html#actions) action: {% highlight scala %} scala> wordCounts.collect() @@ -163,7 +163,7 @@ One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can i >>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b) {% endhighlight %} -Here, we combined the [`flatMap`](programming-guide.html#transformations), [`map`](programming-guide.html#transformations) and [`reduceByKey`](programming-guide.html#transformations) transformations to compute the per-word counts in the file as an RDD of (string, int) pairs. To collect the word counts in our shell, we can use the [`collect`](programming-guide.html#actions) action: +Here, we combined the [`flatMap`](programming-guide.html#transformations), [`map`](programming-guide.html#transformations), and [`reduceByKey`](programming-guide.html#transformations) transformations to compute the per-word counts in the file as an RDD of (string, int) pairs. To collect the word counts in our shell, we can use the [`collect`](programming-guide.html#actions) action: {% highlight python %} >>> wordCounts.collect() @@ -217,13 +217,13 @@ a cluster, as described in the [programming guide](programming-guide.html#initia # Self-Contained Applications -Now say we wanted to write a self-contained application using the Spark API. We will walk through a -simple application in both Scala (with SBT), Java (with Maven), and Python. +Suppose we wish to write a self-contained application using the Spark API. We will walk through a +simple application in Scala (with sbt), Java (with Maven), and Python.
-We'll create a very simple Spark application in Scala. So simple, in fact, that it's +We'll create a very simple Spark application in Scala--so simple, in fact, that it's named `SimpleApp.scala`: {% highlight scala %} @@ -259,7 +259,7 @@ object which contains information about our application. Our application depends on the Spark API, so we'll also include an sbt configuration file, -`simple.sbt` which explains that Spark is a dependency. This file also adds a repository that +`simple.sbt`, which explains that Spark is a dependency. This file also adds a repository that Spark depends on: {% highlight scala %} @@ -302,7 +302,7 @@ Lines with a: 46, Lines with b: 23
-This example will use Maven to compile an application jar, but any similar build system will work. +This example will use Maven to compile an application JAR, but any similar build system will work. We'll create a very simple Spark application, `SimpleApp.java`: @@ -374,7 +374,7 @@ $ find . Now, we can package the application using Maven and execute it with `./bin/spark-submit`. {% highlight bash %} -# Package a jar containing your application +# Package a JAR containing your application $ mvn package ... [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar From 88a07d89e91c139a65d3a2d46632500a93b615c3 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Tue, 8 Sep 2015 14:38:10 +0100 Subject: [PATCH 09/15] Docs small fixes Author: Jacek Laskowski Closes #8629 from jaceklaskowski/docs-fixes. (cherry picked from commit 6ceed852ab716d8acc46ce90cba9cfcff6d3616f) Signed-off-by: Sean Owen --- docs/building-spark.md | 23 +++++++++++------------ docs/cluster-overview.md | 15 ++++++++------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/docs/building-spark.md b/docs/building-spark.md index f133eb96d9a2..4db32cfd628b 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -61,12 +61,13 @@ If you don't run this, you may see errors like the following: You can fix this by setting the `MAVEN_OPTS` variable as discussed before. **Note:** -* *For Java 8 and above this step is not required.* -* *If using `build/mvn` and `MAVEN_OPTS` were not already set, the script will automate this for you.* + +* For Java 8 and above this step is not required. +* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automate this for you. # Specifying the Hadoop Version -Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the "hadoop.version" property. If unset, Spark will build against Hadoop 2.2.0 by default. Note that certain build profiles are required for particular Hadoop versions: +Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the `hadoop.version` property. If unset, Spark will build against Hadoop 2.2.0 by default. Note that certain build profiles are required for particular Hadoop versions: @@ -91,7 +92,7 @@ mvn -Dhadoop.version=1.2.1 -Phadoop-1 -DskipTests clean package mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -Phadoop-1 -DskipTests clean package {% endhighlight %} -You can enable the "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". Spark only supports YARN versions 2.2.0 and later. +You can enable the `yarn` profile and optionally set the `yarn.version` property if it is different from `hadoop.version`. Spark only supports YARN versions 2.2.0 and later. Examples: @@ -125,7 +126,7 @@ mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -Dskip # Building for Scala 2.11 To produce a Spark package compiled with Scala 2.11, use the `-Dscala-2.11` property: - dev/change-scala-version.sh 2.11 + ./dev/change-scala-version.sh 2.11 mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package Spark does not yet support its JDBC component for Scala 2.11. @@ -163,11 +164,9 @@ the `spark-parent` module). Thus, the full flow for running continuous-compilation of the `core` submodule may look more like: -``` - $ mvn install - $ cd core - $ mvn scala:cc -``` + $ mvn install + $ cd core + $ mvn scala:cc # Building Spark with IntelliJ IDEA or Eclipse @@ -193,11 +192,11 @@ then ship it over to the cluster. We are investigating the exact cause for this. # Packaging without Hadoop Dependencies for YARN -The assembly jar produced by `mvn package` will, by default, include all of Spark's dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this causes multiple versions of these to appear on executor classpaths: the version packaged in the Spark assembly and the version on each node, included with yarn.application.classpath. The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects, like ZooKeeper and Hadoop itself. +The assembly jar produced by `mvn package` will, by default, include all of Spark's dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this causes multiple versions of these to appear on executor classpaths: the version packaged in the Spark assembly and the version on each node, included with `yarn.application.classpath`. The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects, like ZooKeeper and Hadoop itself. # Building with SBT -Maven is the official recommendation for packaging Spark, and is the "build of reference". +Maven is the official build tool recommended for packaging Spark, and is the *build of reference*. But SBT is supported for day-to-day development since it can provide much faster iterative compilation. More advanced developers may wish to use SBT. diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index 7079de546e2f..faaf154d243f 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -5,18 +5,19 @@ title: Cluster Mode Overview This document gives a short overview of how Spark runs on clusters, to make it easier to understand the components involved. Read through the [application submission guide](submitting-applications.html) -to submit applications to a cluster. +to learn about launching applications on a cluster. # Components -Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext +Spark applications run as independent sets of processes on a cluster, coordinated by the `SparkContext` object in your main program (called the _driver program_). + Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_ -(either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across +(either Spark's own standalone cluster manager, Mesos or YARN), which allocate resources across applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to -the executors. Finally, SparkContext sends *tasks* for the executors to run. +the executors. Finally, SparkContext sends *tasks* to the executors to run.

Spark cluster components @@ -33,9 +34,9 @@ There are several useful things to note about this architecture: 2. Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN). -3. The driver program must listen for and accept incoming connections from its executors throughout - its lifetime (e.g., see [spark.driver.port and spark.fileserver.port in the network config - section](configuration.html#networking)). As such, the driver program must be network +3. The driver program must listen for and accept incoming connections from its executors throughout + its lifetime (e.g., see [spark.driver.port and spark.fileserver.port in the network config + section](configuration.html#networking)). As such, the driver program must be network addressable from the worker nodes. 4. Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you'd like to send requests to the From 34d417e8e947ed2c1884c772a6a5604c87840967 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 8 Sep 2015 12:48:21 -0700 Subject: [PATCH 10/15] [SPARK-10470] [ML] ml.IsotonicRegressionModel.copy should set parent Copied model must have the same parent, but ml.IsotonicRegressionModel.copy did not set parent. Here fix it and add test case. Author: Yanbo Liang Closes #8637 from yanboliang/spark-10470. (cherry picked from commit f7b55dbfc3343cad988e2490478fce1a11343c73) Signed-off-by: Xiangrui Meng --- .../org/apache/spark/ml/regression/IsotonicRegression.scala | 2 +- .../apache/spark/ml/regression/IsotonicRegressionSuite.scala | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index d43a3447d397..2ff500f291ab 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -203,7 +203,7 @@ class IsotonicRegressionModel private[ml] ( def predictions: Vector = Vectors.dense(oldModel.predictions) override def copy(extra: ParamMap): IsotonicRegressionModel = { - copyValues(new IsotonicRegressionModel(uid, oldModel), extra) + copyValues(new IsotonicRegressionModel(uid, oldModel), extra).setParent(parent) } override def transform(dataset: DataFrame): DataFrame = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala index c0ab00b68a2f..59f4193abc8f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.ml.regression import org.apache.spark.SparkFunSuite import org.apache.spark.ml.param.ParamsSuite +import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} @@ -89,6 +90,10 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { assert(ir.getFeatureIndex === 0) val model = ir.fit(dataset) + + // copied model must have the same parent. + MLTestingUtils.checkCopy(model) + model.transform(dataset) .select("label", "features", "prediction", "weight") .collect() From 7fd4674fc93102f88f961726a0a44006ba6a8140 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 8 Sep 2015 14:20:35 -0700 Subject: [PATCH 11/15] [SPARK-10441] [SQL] [BRANCH-1.5] Save data correctly to json. https://issues.apache.org/jira/browse/SPARK-10441 This is the backport of #8597 for 1.5 branch. Author: Yin Huai Closes #8655 from yhuai/timestampJson-1.5. --- .../spark/sql/RandomDataGenerator.scala | 41 +++++++++- .../datasources/json/JacksonGenerator.scala | 11 ++- .../datasources/json/JacksonParser.scala | 31 ++++++++ .../hive/orc/OrcHadoopFsRelationSuite.scala | 8 ++ .../sources/JsonHadoopFsRelationSuite.scala | 8 ++ .../ParquetHadoopFsRelationSuite.scala | 9 ++- .../SimpleTextHadoopFsRelationSuite.scala | 19 ++++- .../sql/sources/SimpleTextRelation.scala | 7 +- .../sql/sources/hadoopFsRelationSuites.scala | 79 +++++++++++++++++++ 9 files changed, 205 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index 11e0c120f407..4025cbcec101 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -23,6 +23,8 @@ import java.math.MathContext import scala.util.Random +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -84,6 +86,7 @@ object RandomDataGenerator { * random data generator is defined for that data type. The generated values will use an external * representation of the data type; for example, the random generator for [[DateType]] will return * instances of [[java.sql.Date]] and the generator for [[StructType]] will return a [[Row]]. + * For a [[UserDefinedType]] for a class X, an instance of class X is returned. * * @param dataType the type to generate values for * @param nullable whether null values should be generated @@ -106,7 +109,22 @@ object RandomDataGenerator { }) case BooleanType => Some(() => rand.nextBoolean()) case DateType => Some(() => new java.sql.Date(rand.nextInt())) - case TimestampType => Some(() => new java.sql.Timestamp(rand.nextLong())) + case TimestampType => + val generator = + () => { + var milliseconds = rand.nextLong() % 253402329599999L + // -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT + // for "0001-01-01 00:00:00.000000". We need to find a + // number that is greater or equals to this number as a valid timestamp value. + while (milliseconds < -62135740800000L) { + // 253402329599999L is the the number of milliseconds since + // January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999". + milliseconds = rand.nextLong() % 253402329599999L + } + // DateTimeUtils.toJavaTimestamp takes microsecond. + DateTimeUtils.toJavaTimestamp(milliseconds * 1000) + } + Some(generator) case CalendarIntervalType => Some(() => { val months = rand.nextInt(1000) val ns = rand.nextLong() @@ -159,6 +177,27 @@ object RandomDataGenerator { None } } + case udt: UserDefinedType[_] => { + val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, seed) + // Because random data generator at here returns scala value, we need to + // convert it to catalyst value to call udt's deserialize. + val toCatalystType = CatalystTypeConverters.createToCatalystConverter(udt.sqlType) + + if (maybeSqlTypeGenerator.isDefined) { + val sqlTypeGenerator = maybeSqlTypeGenerator.get + val generator = () => { + val generatedScalaValue = sqlTypeGenerator.apply() + if (generatedScalaValue == null) { + null + } else { + udt.deserialize(toCatalystType(generatedScalaValue)) + } + } + Some(generator) + } else { + None + } + } case unsupportedType => None } // Handle nullability by wrapping the non-null value generator: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala index 330ba907b2ef..f65c7bbd6e29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.json import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils import scala.collection.Map @@ -89,7 +90,7 @@ private[sql] object JacksonGenerator { def valWriter: (DataType, Any) => Unit = { case (_, null) | (NullType, _) => gen.writeNull() case (StringType, v) => gen.writeString(v.toString) - case (TimestampType, v: java.sql.Timestamp) => gen.writeString(v.toString) + case (TimestampType, v: Long) => gen.writeString(DateTimeUtils.toJavaTimestamp(v).toString) case (IntegerType, v: Int) => gen.writeNumber(v) case (ShortType, v: Short) => gen.writeNumber(v) case (FloatType, v: Float) => gen.writeNumber(v) @@ -99,8 +100,12 @@ private[sql] object JacksonGenerator { case (ByteType, v: Byte) => gen.writeNumber(v.toInt) case (BinaryType, v: Array[Byte]) => gen.writeBinary(v) case (BooleanType, v: Boolean) => gen.writeBoolean(v) - case (DateType, v) => gen.writeString(v.toString) - case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, udt.serialize(v)) + case (DateType, v: Int) => gen.writeString(DateTimeUtils.toJavaDate(v).toString) + // For UDT values, they should be in the SQL type's corresponding value type. + // We should not see values in the user-defined class at here. + // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is + // an ArrayData at here, instead of a Vector. + case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, v) case (ArrayType(ty, _), v: ArrayData) => gen.writeStartArray() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index cd68bd667c5c..ff4d8c04e8ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -81,9 +81,37 @@ private[sql] object JacksonParser { case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) => parser.getFloatValue + case (VALUE_STRING, FloatType) => + // Special case handling for NaN and Infinity. + val value = parser.getText + val lowerCaseValue = value.toLowerCase() + if (lowerCaseValue.equals("nan") || + lowerCaseValue.equals("infinity") || + lowerCaseValue.equals("-infinity") || + lowerCaseValue.equals("inf") || + lowerCaseValue.equals("-inf")) { + value.toFloat + } else { + sys.error(s"Cannot parse $value as FloatType.") + } + case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) => parser.getDoubleValue + case (VALUE_STRING, DoubleType) => + // Special case handling for NaN and Infinity. + val value = parser.getText + val lowerCaseValue = value.toLowerCase() + if (lowerCaseValue.equals("nan") || + lowerCaseValue.equals("infinity") || + lowerCaseValue.equals("-infinity") || + lowerCaseValue.equals("inf") || + lowerCaseValue.equals("-inf")) { + value.toDouble + } else { + sys.error(s"Cannot parse $value as DoubleType.") + } + case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) @@ -126,6 +154,9 @@ private[sql] object JacksonParser { case (_, udt: UserDefinedType[_]) => convertField(factory, parser, udt.sqlType) + + case (token, dataType) => + sys.error(s"Failed to parse a value for data type $dataType (current token: $token).") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index deec0048d24b..593e68949ef4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -29,6 +29,14 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { import sqlContext._ import sqlContext.implicits._ + // ORC does not play well with NullType and UDT. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: NullType => false + case _: CalendarIntervalType => false + case _: UserDefinedType[_] => false + case _ => true + } + test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index 8ca3a1708519..f7386e0db576 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -30,6 +30,14 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { import sqlContext._ + // JSON does not write data of NullType and does not play well with BinaryType. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: NullType => false + case _: BinaryType => false + case _: CalendarIntervalType => false + case _ => true + } + test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 06dadbb5feab..5275ae6511f1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{execution, AnalysisException, SaveMode} -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types._ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { @@ -33,6 +33,13 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { import sqlContext._ import sqlContext.implicits._ + // Parquet does not play well with NullType. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: NullType => false + case _: CalendarIntervalType => false + case _ => true + } + test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index e8975e5f5cd0..bd0abecd37a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -20,13 +20,30 @@ package org.apache.spark.sql.sources import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types._ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName import sqlContext._ + // We have a very limited number of supported types at here since it is just for a + // test relation and we do very basic testing at here. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: BinaryType => false + // We are using random data generator and the generated strings are not really valid string. + case _: StringType => false + case _: BooleanType => false // see https://issues.apache.org/jira/browse/SPARK-10442 + case _: CalendarIntervalType => false + case _: DateType => false + case _: TimestampType => false + case _: ArrayType => false + case _: MapType => false + case _: StructType => false + case _: UserDefinedType[_] => false + case _ => true + } + test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 527ca7a81cad..aeaaa3e1c522 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -68,7 +68,9 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) override def write(row: Row): Unit = { - val serialized = row.toSeq.map(_.toString).mkString(",") + val serialized = row.toSeq.map { v => + if (v == null) "" else v.toString + }.mkString(",") recordWriter.write(null, new Text(serialized)) } @@ -112,7 +114,8 @@ class SimpleTextRelation( val fields = dataSchema.map(_.dataType) sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record => - Row(record.split(",").zip(fields).map { case (value, dataType) => + Row(record.split(",", -1).zip(fields).map { case (v, dataType) => + val value = if (v == "") null else v // `Cast`ed values are always of Catalyst types (i.e. UTF8String instead of String, etc.) val catalystValue = Cast(Literal(value), dataType).eval() // Here we're converting Catalyst values to Scala values to test `needsConversion` diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 33d8730cadcf..8405b00635fd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -40,6 +40,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { val dataSourceName: String + protected def supportsDataType(dataType: DataType): Boolean = true + val dataSchema = StructType( Seq( @@ -100,6 +102,83 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } } + test("test all data types") { + withTempPath { file => + // Create the schema. + val struct = + StructType( + StructField("f1", FloatType, true) :: + StructField("f2", ArrayType(BooleanType), true) :: Nil) + // TODO: add CalendarIntervalType to here once we can save it out. + val dataTypes = + Seq( + StringType, BinaryType, NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), MapType(StringType, LongType), struct, + new MyDenseVectorUDT()) + val fields = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, nullable = true) + } + val schema = StructType(fields) + + // Generate data at the driver side. We need to materialize the data first and then + // create RDD. + val maybeDataGenerator = + RandomDataGenerator.forType( + dataType = schema, + nullable = true, + seed = Some(System.nanoTime())) + val dataGenerator = + maybeDataGenerator + .getOrElse(fail(s"Failed to create data generator for schema $schema")) + val data = (1 to 10).map { i => + dataGenerator.apply() match { + case row: Row => row + case null => Row.fromSeq(Seq.fill(schema.length)(null)) + case other => + fail(s"Row or null is expected to be generated, " + + s"but a ${other.getClass.getCanonicalName} is generated.") + } + } + + // Create a DF for the schema with random data. + val rdd = sqlContext.sparkContext.parallelize(data, 10) + val df = sqlContext.createDataFrame(rdd, schema) + + // All columns that have supported data types of this source. + val supportedColumns = schema.fields.collect { + case StructField(name, dataType, _, _) if supportsDataType(dataType) => name + } + val selectedColumns = util.Random.shuffle(supportedColumns.toSeq) + + val dfToBeSaved = df.selectExpr(selectedColumns: _*) + + // Save the data out. + dfToBeSaved + .write + .format(dataSourceName) + .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. + .save(file.getCanonicalPath) + + val loadedDF = + sqlContext + .read + .format(dataSourceName) + .schema(dfToBeSaved.schema) + .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. + .load(file.getCanonicalPath) + .selectExpr(selectedColumns: _*) + + // Read the data back. + checkAnswer( + loadedDF, + dfToBeSaved + ) + } + } + test("save()/load() - non-partitioned table - Overwrite") { withTempPath { file => testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) From f8c4b65b72c197b0f8462188a43082020a234fc9 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 8 Sep 2015 14:45:50 -0700 Subject: [PATCH 12/15] removed "candidate" from version --- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml | 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index ddbcbb04a38e..1cec5da8a33e 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 09f0a165125b..d3599a623cfa 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 08b9d5faab55..69c3e5b3601e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 36d5b10fccf7..3508a98118d2 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 6328813535b5..2325dcfc6170 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 1c2a0ebf60dd..19b0c205737a 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index f143bd1efbc5..78ebb358d370 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index ef52540ec8e0..c4d6fb16b285 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 89429cd495a9..bbf30cff161d 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index 0f2fec9ef7ea..19c56abb9134 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index d1c9e356d61e..9b50500bdb9c 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index 047b4548d7b5..e93941812091 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index c499aa3b1584..a4763c0fbadc 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index ab8b90bb7360..5f33f0df5407 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml index 852d12551aaf..665b802866f8 100644 --- a/extras/kinesis-asl-assembly/pom.xml +++ b/extras/kinesis-asl-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index ad79760d57f6..9978f3208ff6 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index 8b81fee7cbe4..04f7eba766ee 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 2564d7a52264..92efdd7a335d 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index 2a042e3c5030..e3b014c7e35a 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index c626bb003318..186b3272ea18 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/network/common/pom.xml b/network/common/pom.xml index 7149a90840a8..bdd0011cbb0d 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index ef89371b252f..2e874683400d 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index a3ad070b3a41..e081ad139dbe 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/pom.xml b/pom.xml index eb10be6b532a..be6285dd4618 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index 09e3f3a4a796..1f23ae58a358 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 90fd584fd234..4b547d45c79a 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 294605182714..2b00d8f7fc1b 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 7da500adb1d3..7bbce14e3ff1 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 248d166b5df6..7d91418ef189 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index a7f422d694ce..f9d4cbd8479a 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 0ec3c6b3cfb8..342241402cd4 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/unsafe/pom.xml b/unsafe/pom.xml index 544d8b561946..61e4dcf70244 100644 --- a/unsafe/pom.xml +++ b/unsafe/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index fb7d9334a7d8..b859a477108f 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-candidate-csd-1-SNAPSHOT + 1.5.0-csd-1-SNAPSHOT ../pom.xml From 63c72b93eb51685814543a39caf9a6d221e2583c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 8 Sep 2015 14:54:43 -0700 Subject: [PATCH 13/15] [SPARK-10492] [STREAMING] [DOCUMENTATION] Update Streaming documentation about rate limiting and backpressure Author: Tathagata Das Closes #8656 from tdas/SPARK-10492 and squashes the following commits: 986cdd6 [Tathagata Das] Added information on backpressure (cherry picked from commit 52b24a602ad615a7f6aa427aefb1c7444c05d298) Signed-off-by: Tathagata Das --- docs/configuration.md | 13 +++++++++++++ docs/streaming-programming-guide.md | 13 ++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 77c5cbc7b319..353efdbae522 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1437,6 +1437,19 @@ Apart from these, the following properties are also available, and may be useful #### Spark Streaming

+ + + + + diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index a1acf83f7524..c751dbb41785 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1807,7 +1807,7 @@ To run a Spark Streaming applications, you need to have the following. + *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has been used to achieve this with Mesos. -- *[Since Spark 1.2] Configuring write ahead logs* - Since Spark 1.2, +- *Configuring write ahead logs* - Since Spark 1.2, we have introduced _write ahead logs_ for achieving strong fault-tolerance guarantees. If enabled, all the data received from a receiver gets written into a write ahead log in the configuration checkpoint directory. This prevents data loss on driver @@ -1822,6 +1822,17 @@ To run a Spark Streaming applications, you need to have the following. stored in a replicated storage system. This can be done by setting the storage level for the input stream to `StorageLevel.MEMORY_AND_DISK_SER`. +- *Setting the max receiving rate* - If the cluster resources is not large enough for the streaming + application to process data as fast as it is being received, the receivers can be rate limited + by setting a maximum rate limit in terms of records / sec. + See the [configuration parameters](configuration.html#spark-streaming) + `spark.streaming.receiver.maxRate` for receivers and `spark.streaming.kafka.maxRatePerPartition` + for Direct Kafka approach. In Spark 1.5, we have introduced a feature called *backpressure* that + eliminate the need to set this rate limit, as Spark Streaming automatically figures out the + rate limits and dynamically adjusts them if the processing conditions change. This backpressure + can be enabled by setting the [configuration parameter](configuration.html#spark-streaming) + `spark.streaming.backpressure.enabled` to `true`. + ### Upgrading Application Code {:.no_toc} From fca16c59da75b08d18cb9d6da7942cd24b05518e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 8 Sep 2015 20:30:24 -0700 Subject: [PATCH 14/15] [SPARK-10301] [SPARK-10428] [SQL] [BRANCH-1.5] Fixes schema merging for nested structs We used to workaround SPARK-10301 with a quick fix in branch-1.5 (PR #8515), but it doesn't cover the case described in SPARK-10428. So this PR backports PR #8509, which had once been considered too big a change to be merged into branch-1.5 in the last minute, to fix both SPARK-10301 and SPARK-10428 for Spark 1.5. Also added more test cases for SPARK-10428. This PR looks big, but the essential change is only ~200 loc. All other changes are for testing. Especially, PR #8454 is also backported here because the `ParquetInteroperabilitySuite` introduced in PR #8515 depends on it. This should be safe since #8454 only touches testing code. Author: Cheng Lian Closes #8583 from liancheng/spark-10301/for-1.5. --- .../parquet/CatalystReadSupport.scala | 243 +++++--- .../parquet/CatalystRowConverter.scala | 63 +-- .../parquet/CatalystSchemaConverter.scala | 14 +- .../parquet/DirectParquetWriter.scala | 81 +++ .../ParquetInteroperabilitySuite.scala | 111 ++++ .../parquet/ParquetQuerySuite.scala | 322 +++++++++++ .../parquet/ParquetSchemaSuite.scala | 527 ++++++++++++++++++ 7 files changed, 1233 insertions(+), 128 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetWriter.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 3f8353af6e2a..00f36caeaef4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.{Map => JMap} -import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap} +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema._ import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { // Called after `init()` when initializing Parquet record reader. @@ -44,7 +45,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with val parquetRequestedSchema = readContext.getRequestedSchema val catalystRequestedSchema = - Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata => + Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata => metadata // First tries to read requested schema, which may result from projections .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) @@ -81,70 +82,10 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // `StructType` containing all requested columns. val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) - // Below we construct a Parquet schema containing all requested columns. This schema tells - // Parquet which columns to read. - // - // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise, - // we have to fallback to the full file schema which contains all columns in the file. - // Obviously this may waste IO bandwidth since it may read more columns than requested. - // - // Two things to note: - // - // 1. It's possible that some requested columns don't exist in the target Parquet file. For - // example, in the case of schema merging, the globally merged schema may contain extra - // columns gathered from other Parquet files. These columns will be simply filled with nulls - // when actually reading the target Parquet file. - // - // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to - // Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to - // non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file - // containing a single integer array field `f1` may have the following legacy 2-level - // structure: - // - // message root { - // optional group f1 (LIST) { - // required INT32 element; - // } - // } - // - // while `CatalystSchemaConverter` may generate a standard 3-level structure: - // - // message root { - // optional group f1 (LIST) { - // repeated group list { - // required INT32 element; - // } - // } - // } - // - // Apparently, we can't use the 2nd schema to read the target Parquet file as they have - // different physical structures. val parquetRequestedSchema = maybeRequestedSchema.fold(context.getFileSchema) { schemaString => - val toParquet = new CatalystSchemaConverter(conf) - val fileSchema = context.getFileSchema.asGroupType() - val fileFieldNames = fileSchema.getFields.map(_.getName).toSet - - StructType - // Deserializes the Catalyst schema of requested columns - .fromString(schemaString) - .map { field => - if (fileFieldNames.contains(field.name)) { - // If the field exists in the target Parquet file, extracts the field type from the - // full file schema and makes a single-field Parquet schema - new MessageType("root", fileSchema.getType(field.name)) - } else { - // Otherwise, just resorts to `CatalystSchemaConverter` - toParquet.convert(StructType(Array(field))) - } - } - // Merges all single-field Parquet schemas to form a complete schema for all requested - // columns. Note that it's possible that no columns are requested at all (e.g., count - // some partition column of a partitioned Parquet table). That's why `fold` is used here - // and always fallback to an empty Parquet schema. - .fold(new MessageType("root")) { - _ union _ - } + val catalystRequestedSchema = StructType.fromString(schemaString) + CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) } val metadata = @@ -152,7 +93,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) - new ReadContext(parquetRequestedSchema, metadata) + new ReadContext(parquetRequestedSchema, metadata.asJava) } } @@ -160,4 +101,172 @@ private[parquet] object CatalystReadSupport { val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" + + /** + * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist + * in `catalystSchema`, and adding those only exist in `catalystSchema`. + */ + def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { + val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) + Types.buildMessage().addFields(clippedParquetFields: _*).named("root") + } + + private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { + catalystType match { + case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => + // Only clips array types with nested type as element type. + clipParquetListType(parquetType.asGroupType(), t.elementType) + + case t: MapType + if !isPrimitiveCatalystType(t.keyType) || + !isPrimitiveCatalystType(t.valueType) => + // Only clips map types with nested key type or value type + clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) + + case t: StructType => + clipParquetGroup(parquetType.asGroupType(), t) + + case _ => + // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able + // to be mapped to desired user-space types. So UDTs shouldn't participate schema merging. + parquetType + } + } + + /** + * Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to + * [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an + * [[AtomicType]]. + */ + private def isPrimitiveCatalystType(dataType: DataType): Boolean = { + dataType match { + case _: ArrayType | _: MapType | _: StructType => false + case _ => true + } + } + + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type + * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a + * [[StructType]]. + */ + private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { + // Precondition of this method, should only be called for lists with nested element types. + assert(!isPrimitiveCatalystType(elementType)) + + // Unannotated repeated group should be interpreted as required list of required element, so + // list element type is just the group itself. Clip it. + if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { + clipParquetType(parquetList, elementType) + } else { + assert( + parquetList.getOriginalType == OriginalType.LIST, + "Invalid Parquet schema. " + + "Original type of annotated Parquet lists must be LIST: " + + parquetList.toString) + + assert( + parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED), + "Invalid Parquet schema. " + + "LIST-annotated group should only have exactly one repeated field: " + + parquetList) + + // Precondition of this method, should only be called for lists with nested element types. + assert(!parquetList.getType(0).isPrimitive) + + val repeatedGroup = parquetList.getType(0).asGroupType() + + // If the repeated field is a group with multiple fields, or the repeated field is a group + // with one field and is named either "array" or uses the LIST-annotated group's name with + // "_tuple" appended then the repeated type is the element type and elements are required. + // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the + // only field. + if ( + repeatedGroup.getFieldCount > 1 || + repeatedGroup.getName == "array" || + repeatedGroup.getName == parquetList.getName + "_tuple" + ) { + Types + .buildGroup(parquetList.getRepetition) + .as(OriginalType.LIST) + .addField(clipParquetType(repeatedGroup, elementType)) + .named(parquetList.getName) + } else { + // Otherwise, the repeated field's type is the element type with the repeated field's + // repetition. + Types + .buildGroup(parquetList.getRepetition) + .as(OriginalType.LIST) + .addField( + Types + .repeatedGroup() + .addField(clipParquetType(repeatedGroup.getType(0), elementType)) + .named(repeatedGroup.getName)) + .named(parquetList.getName) + } + } + } + + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. Either key type or + * value type of the [[MapType]] must be a nested type, namely an [[ArrayType]], a [[MapType]], or + * a [[StructType]]. + */ + private def clipParquetMapType( + parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { + // Precondition of this method, only handles maps with nested key types or value types. + assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType)) + + val repeatedGroup = parquetMap.getType(0).asGroupType() + val parquetKeyType = repeatedGroup.getType(0) + val parquetValueType = repeatedGroup.getType(1) + + val clippedRepeatedGroup = + Types + .repeatedGroup() + .as(repeatedGroup.getOriginalType) + .addField(clipParquetType(parquetKeyType, keyType)) + .addField(clipParquetType(parquetValueType, valueType)) + .named(repeatedGroup.getName) + + Types + .buildGroup(parquetMap.getRepetition) + .as(parquetMap.getOriginalType) + .addField(clippedRepeatedGroup) + .named(parquetMap.getName) + } + + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. + * + * @return A clipped [[GroupType]], which has at least one field. + * @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty + * [[MessageType]]. Because it's legal to construct an empty requested schema for column + * pruning. + */ + private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = { + val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType) + Types + .buildGroup(parquetRecord.getRepetition) + .as(parquetRecord.getOriginalType) + .addFields(clippedParquetFields: _*) + .named(parquetRecord.getName) + } + + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. + * + * @return A list of clipped [[GroupType]] fields, which can be empty. + */ + private def clipParquetGroupFields( + parquetRecord: GroupType, structType: StructType): Seq[Type] = { + val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true) + structType.map { f => + parquetFieldMap + .get(f.name) + .map(clipParquetType(_, f.dataType)) + .getOrElse(toParquet.convertField(f)) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index d99bfe4cd85d..2ff2fda3610b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary @@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have * any "parent" container. * - * @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the - * Parquet file being read, while constructor argument [[catalystType]] refers to requested - * fields of the global schema. The key difference is that, in case of schema merging, - * [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have - * the following [[catalystType]]: - * {{{ - * new StructType() - * .add("f1", IntegerType, nullable = false) - * .add("f2", StringType, nullable = true) - * .add("f3", new StructType() - * .add("f31", DoubleType, nullable = false) - * .add("f32", IntegerType, nullable = true) - * .add("f33", StringType, nullable = true), nullable = false) - * }}} - * and the following [[parquetType]] (`f2` and `f32` are missing): - * {{{ - * message root { - * required int32 f1; - * required group f3 { - * required double f31; - * optional binary f33 (utf8); - * } - * } - * }}} - * * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type * @param updater An updater which propagates converted field values to the parent container @@ -148,6 +123,16 @@ private[parquet] class CatalystRowConverter( updater: ParentContainerUpdater) extends CatalystGroupConverter(updater) with Logging { + assert( + parquetType.getFieldCount == catalystType.length, + s"""Field counts of the Parquet schema and the Catalyst schema don't match: + | + |Parquet schema: + |$parquetType + |Catalyst schema: + |${catalystType.prettyJson} + """.stripMargin) + logDebug( s"""Building row converter for the following schema: | @@ -179,31 +164,7 @@ private[parquet] class CatalystRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { - // In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad - // those missing fields and create converters for them, although values of these fields are - // always null. - val paddedParquetFields = { - val parquetFields = parquetType.getFields - val parquetFieldNames = parquetFields.map(_.getName).toSet - val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name)) - - // We don't need to worry about feature flag arguments like `assumeBinaryIsString` when - // creating the schema converter here, since values of missing fields are always null. - val toParquet = new CatalystSchemaConverter() - - (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f => - catalystType.indexWhere(_.name == f.getName) - } - } - - if (paddedParquetFields.length != catalystType.length) { - throw new UnsupportedOperationException( - "A Parquet file's schema has different number of fields with the table schema. " + - "Please enable schema merging by setting \"mergeSchema\" to true when load " + - "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.") - } - - paddedParquetFields.zip(catalystType).zipWithIndex.map { + parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { case ((parquetFieldType, catalystField), ordinal) => // Converted field value should be set to the `ordinal`-th cell of `currentRow` newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 71161f8bf3e6..9c539130d422 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -55,16 +55,10 @@ import org.apache.spark.sql.{AnalysisException, SQLConf} * to old style non-standard behaviors. */ private[parquet] class CatalystSchemaConverter( - private val assumeBinaryIsString: Boolean, - private val assumeInt96IsTimestamp: Boolean, - private val followParquetFormatSpec: Boolean) { - - // Only used when constructing converter for converting Spark SQL schema to Parquet schema, in - // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant. - def this() = this( - assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get) + assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, + assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, + followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get +) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetWriter.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetWriter.scala new file mode 100644 index 000000000000..d05c6098dca0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetWriter.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.{MessageType, MessageTypeParser} + +private[sql] object DirectParquetWriter { + type RecordBuilder = RecordConsumer => Unit + + /** + * A testing Parquet [[WriteSupport]] implementation used to write manually constructed Parquet + * records with arbitrary structures. + */ + private class DirectWriteSupport(schema: MessageType, metadata: Map[String, String]) + extends WriteSupport[RecordBuilder] { + + private var recordConsumer: RecordConsumer = _ + + override def init(configuration: conf.Configuration): WriteContext = { + new WriteContext(schema, metadata.asJava) + } + + override def write(buildRecord: RecordBuilder): Unit = { + recordConsumer.startMessage() + buildRecord(recordConsumer) + recordConsumer.endMessage() + } + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { + this.recordConsumer = recordConsumer + } + } + + def writeDirect + (path: String, schema: String, metadata: Map[String, String] = Map.empty) + (f: ParquetWriter[RecordBuilder] => Unit): Unit = { + val messageType = MessageTypeParser.parseMessageType(schema) + val writeSupport = new DirectWriteSupport(messageType, metadata) + val parquetWriter = new ParquetWriter[RecordBuilder](new Path(path), writeSupport) + try f(parquetWriter) finally parquetWriter.close() + } + + def message(writer: ParquetWriter[RecordBuilder])(builder: RecordBuilder): Unit = { + writer.write(builder) + } + + def group(consumer: RecordConsumer)(f: => Unit): Unit = { + consumer.startGroup() + f + consumer.endGroup() + } + + def field(consumer: RecordConsumer, name: String, index: Int = 0)(f: => Unit): Unit = { + consumer.startField(name, index) + f + consumer.endField(name, index) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala new file mode 100644 index 000000000000..d17d9304efb2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSQLContext { + test("parquet files with different physical schemas but share the same logical schema") { + // This test case writes two Parquet files, both representing the following Catalyst schema + // + // StructType( + // StructField( + // "f", + // ArrayType(IntegerType, containsNull = false), + // nullable = false)) + // + // The first Parquet file comes with parquet-avro style 2-level LIST-annotated repeated group, + // while the other one comes with parquet-protobuf style 1-level unannotated repeated primitive + // field. + withTempDir { dir => + import DirectParquetWriter._ + + val avroStylePath = new File(dir, "avro-style").getCanonicalPath + val protobufStylePath = new File(dir, "protobuf-style").getCanonicalPath + + val avroStyleSchema = + """message avro_style { + | required group f (LIST) { + | repeated int32 array; + | } + |} + """.stripMargin + + writeDirect(avroStylePath, avroStyleSchema) { writer => + message(writer) { rc => + field(rc, "f") { + group(rc) { + field(rc, "array") { + rc.addInteger(0) + rc.addInteger(1) + } + } + } + } + + message(writer) { rc => + field(rc, "f") { + group(rc) { + field(rc, "array") { + rc.addInteger(2) + rc.addInteger(3) + } + } + } + } + } + + logParquetSchema(avroStylePath) + + val protobufStyleSchema = + """message protobuf_style { + | repeated int32 f; + |} + """.stripMargin + + writeDirect(protobufStylePath, protobufStyleSchema) { writer => + message(writer) { rc => + field(rc, "f") { + rc.addInteger(4) + rc.addInteger(5) + } + } + + message(writer) { rc => + field(rc, "f") { + rc.addInteger(6) + rc.addInteger(7) + } + } + } + + logParquetSchema(protobufStylePath) + + checkAnswer( + sqlContext.read.parquet(dir.getCanonicalPath), + Seq( + Row(Seq(0, 1)), + Row(Seq(2, 3)), + Row(Seq(4, 5)), + Row(Seq(6, 7)))) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index b7b70c2bbbd5..2cfa42d8cad9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -22,6 +22,9 @@ import java.io.File import org.apache.hadoop.fs.Path import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow +import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStructUDT, NestedStruct} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -229,4 +232,323 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-10301 requested schema clipping - same schema") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) + df.write.parquet(path) + + val userDefinedSchema = + new StructType() + .add( + "s", + new StructType() + .add("a", LongType, nullable = true) + .add("b", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(0L, 1L))) + } + } + + // This test case is ignored because of parquet-mr bug PARQUET-370 + ignore("SPARK-10301 requested schema clipping - schemas with disjoint sets of fields") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) + df.write.parquet(path) + + val userDefinedSchema = + new StructType() + .add( + "s", + new StructType() + .add("c", LongType, nullable = true) + .add("d", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(null, null))) + } + } + + test("SPARK-10301 requested schema clipping - requested schema contains physical schema") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) + df.write.parquet(path) + + val userDefinedSchema = + new StructType() + .add( + "s", + new StructType() + .add("a", LongType, nullable = true) + .add("b", LongType, nullable = true) + .add("c", LongType, nullable = true) + .add("d", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(0L, 1L, null, null))) + } + + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'd', id + 3) AS s").coalesce(1) + df.write.parquet(path) + + val userDefinedSchema = + new StructType() + .add( + "s", + new StructType() + .add("a", LongType, nullable = true) + .add("b", LongType, nullable = true) + .add("c", LongType, nullable = true) + .add("d", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(0L, null, null, 3L))) + } + } + + test("SPARK-10301 requested schema clipping - physical schema contains requested schema") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 3) AS s") + .coalesce(1) + + df.write.parquet(path) + + val userDefinedSchema = + new StructType() + .add( + "s", + new StructType() + .add("a", LongType, nullable = true) + .add("b", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(0L, 1L))) + } + + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 3) AS s") + .coalesce(1) + + df.write.parquet(path) + + val userDefinedSchema = + new StructType() + .add( + "s", + new StructType() + .add("a", LongType, nullable = true) + .add("d", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(0L, 3L))) + } + } + + test("SPARK-10301 requested schema clipping - schemas overlap but don't contain each other") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s") + .coalesce(1) + + df.write.parquet(path) + + val userDefinedSchema = + new StructType() + .add( + "s", + new StructType() + .add("b", LongType, nullable = true) + .add("c", LongType, nullable = true) + .add("d", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(1L, 2L, null))) + } + } + + test("SPARK-10301 requested schema clipping - deeply nested struct") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', ARRAY(NAMED_STRUCT('b', id, 'c', id))) AS s") + .coalesce(1) + + df.write.parquet(path) + + val userDefinedSchema = new StructType() + .add("s", + new StructType() + .add( + "a", + ArrayType( + new StructType() + .add("b", LongType, nullable = true) + .add("d", StringType, nullable = true), + containsNull = true), + nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(Seq(Row(0, null))))) + } + } + + test("SPARK-10301 requested schema clipping - out of order") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df1 = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s") + .coalesce(1) + + val df2 = sqlContext + .range(1, 2) + .selectExpr("NAMED_STRUCT('c', id + 2, 'b', id + 1, 'd', id + 3) AS s") + .coalesce(1) + + df1.write.parquet(path) + df2.write.mode(SaveMode.Append).parquet(path) + + val userDefinedSchema = new StructType() + .add("s", + new StructType() + .add("a", LongType, nullable = true) + .add("b", LongType, nullable = true) + .add("d", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Seq( + Row(Row(0, 1, null)), + Row(Row(null, 2, 4)))) + } + } + + test("SPARK-10301 requested schema clipping - schema merging") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df1 = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', id, 'c', id + 2) AS s") + .coalesce(1) + + val df2 = sqlContext + .range(1, 2) + .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s") + .coalesce(1) + + df1.write.mode(SaveMode.Append).parquet(path) + df2.write.mode(SaveMode.Append).parquet(path) + + checkAnswer( + sqlContext + .read + .option("mergeSchema", "true") + .parquet(path) + .selectExpr("s.a", "s.b", "s.c"), + Seq( + Row(0, null, 2), + Row(1, 2, 3))) + } + } + + test("SPARK-10301 requested schema clipping - UDT") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df = sqlContext + .range(1) + .selectExpr( + """NAMED_STRUCT( + | 'f0', CAST(id AS STRING), + | 'f1', NAMED_STRUCT( + | 'a', CAST(id + 1 AS INT), + | 'b', CAST(id + 2 AS LONG), + | 'c', CAST(id + 3.5 AS DOUBLE) + | ) + |) AS s + """.stripMargin) + .coalesce(1) + + df.write.mode(SaveMode.Append).parquet(path) + + val userDefinedSchema = + new StructType() + .add( + "s", + new StructType() + .add("f1", new NestedStructUDT, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(NestedStruct(1, 2L, 3.5D)))) + } + } +} + +object TestingUDT { + @SQLUserDefinedType(udt = classOf[NestedStructUDT]) + case class NestedStruct(a: Integer, b: Long, c: Double) + + class NestedStructUDT extends UserDefinedType[NestedStruct] { + override def sqlType: DataType = + new StructType() + .add("a", IntegerType, nullable = true) + .add("b", LongType, nullable = false) + .add("c", DoubleType, nullable = false) + + override def serialize(obj: Any): Any = { + val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType)) + obj match { + case n: NestedStruct => + row.setInt(0, n.a) + row.setLong(1, n.b) + row.setDouble(2, n.c) + } + } + + override def userClass: Class[NestedStruct] = classOf[NestedStruct] + + override def deserialize(datum: Any): NestedStruct = { + datum match { + case row: InternalRow => + NestedStruct(row.getInt(0), row.getLong(1), row.getDouble(2)) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index b344616a9b29..eb7192d40046 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -941,4 +941,531 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3)); |} """.stripMargin) + + private def testSchemaClipping( + testName: String, + parquetSchema: String, + catalystSchema: StructType, + expectedSchema: String): Unit = { + test(s"Clipping - $testName") { + val expected = MessageTypeParser.parseMessageType(expectedSchema) + val actual = CatalystReadSupport.clipParquetSchema( + MessageTypeParser.parseMessageType(parquetSchema), catalystSchema) + + try { + expected.checkContains(actual) + actual.checkContains(expected) + } catch { case cause: Throwable => + fail( + s"""Expected clipped schema: + |$expected + |Actual clipped schema: + |$actual + """.stripMargin, + cause) + } + } + } + + testSchemaClipping( + "simple nested struct", + + parquetSchema = + """message root { + | required group f0 { + | optional int32 f00; + | optional int32 f01; + | } + |} + """.stripMargin, + + catalystSchema = { + val f0Type = new StructType().add("f00", IntegerType, nullable = true) + new StructType() + .add("f0", f0Type, nullable = false) + .add("f1", IntegerType, nullable = true) + }, + + expectedSchema = + """message root { + | required group f0 { + | optional int32 f00; + | } + | optional int32 f1; + |} + """.stripMargin) + + testSchemaClipping( + "parquet-protobuf style array", + + parquetSchema = + """message root { + | required group f0 { + | repeated binary f00 (UTF8); + | repeated group f01 { + | optional int32 f010; + | optional double f011; + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f00Type = ArrayType(StringType, containsNull = false) + val f01Type = ArrayType( + new StructType() + .add("f011", DoubleType, nullable = true), + containsNull = false) + + val f0Type = new StructType() + .add("f00", f00Type, nullable = false) + .add("f01", f01Type, nullable = false) + val f1Type = ArrayType(IntegerType, containsNull = true) + + new StructType() + .add("f0", f0Type, nullable = false) + .add("f1", f1Type, nullable = true) + }, + + expectedSchema = + """message root { + | required group f0 { + | repeated binary f00 (UTF8); + | repeated group f01 { + | optional double f011; + | } + | } + | + | optional group f1 (LIST) { + | repeated group list { + | optional int32 element; + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "parquet-thrift style array", + + parquetSchema = + """message root { + | required group f0 { + | optional group f00 (LIST) { + | repeated binary f00_tuple (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group f01_tuple { + | optional int32 f010; + | optional double f011; + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f01ElementType = new StructType() + .add("f011", DoubleType, nullable = true) + .add("f012", LongType, nullable = true) + + val f0Type = new StructType() + .add("f00", ArrayType(StringType, containsNull = false), nullable = true) + .add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + """message root { + | required group f0 { + | optional group f00 (LIST) { + | repeated binary f00_tuple (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group f01_tuple { + | optional double f011; + | optional int64 f012; + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "parquet-avro style array", + + parquetSchema = + """message root { + | required group f0 { + | optional group f00 (LIST) { + | repeated binary array (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group array { + | optional int32 f010; + | optional double f011; + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f01ElementType = new StructType() + .add("f011", DoubleType, nullable = true) + .add("f012", LongType, nullable = true) + + val f0Type = new StructType() + .add("f00", ArrayType(StringType, containsNull = false), nullable = true) + .add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + """message root { + | required group f0 { + | optional group f00 (LIST) { + | repeated binary array (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group array { + | optional double f011; + | optional int64 f012; + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "parquet-hive style array", + + parquetSchema = + """message root { + | optional group f0 { + | optional group f00 (LIST) { + | repeated group bag { + | optional binary array_element; + | } + | } + | + | optional group f01 (LIST) { + | repeated group bag { + | optional group array_element { + | optional int32 f010; + | optional double f011; + | } + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f01ElementType = new StructType() + .add("f011", DoubleType, nullable = true) + .add("f012", LongType, nullable = true) + + val f0Type = new StructType() + .add("f00", ArrayType(StringType, containsNull = true), nullable = true) + .add("f01", ArrayType(f01ElementType, containsNull = true), nullable = true) + + new StructType().add("f0", f0Type, nullable = true) + }, + + expectedSchema = + """message root { + | optional group f0 { + | optional group f00 (LIST) { + | repeated group bag { + | optional binary array_element; + | } + | } + | + | optional group f01 (LIST) { + | repeated group bag { + | optional group array_element { + | optional double f011; + | optional int64 f012; + | } + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "2-level list of required struct", + + parquetSchema = + s"""message root { + | required group f0 { + | required group f00 (LIST) { + | repeated group element { + | required int32 f000; + | optional int64 f001; + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f00ElementType = + new StructType() + .add("f001", LongType, nullable = true) + .add("f002", DoubleType, nullable = false) + + val f00Type = ArrayType(f00ElementType, containsNull = false) + val f0Type = new StructType().add("f00", f00Type, nullable = false) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + s"""message root { + | required group f0 { + | required group f00 (LIST) { + | repeated group element { + | optional int64 f001; + | required double f002; + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "standard array", + + parquetSchema = + """message root { + | required group f0 { + | optional group f00 (LIST) { + | repeated group list { + | required binary element (UTF8); + | } + | } + | + | optional group f01 (LIST) { + | repeated group list { + | required group element { + | optional int32 f010; + | optional double f011; + | } + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f01ElementType = new StructType() + .add("f011", DoubleType, nullable = true) + .add("f012", LongType, nullable = true) + + val f0Type = new StructType() + .add("f00", ArrayType(StringType, containsNull = false), nullable = true) + .add("f01", ArrayType(f01ElementType, containsNull = false), nullable = true) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + """message root { + | required group f0 { + | optional group f00 (LIST) { + | repeated group list { + | required binary element (UTF8); + | } + | } + | + | optional group f01 (LIST) { + | repeated group list { + | required group element { + | optional double f011; + | optional int64 f012; + | } + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "empty requested schema", + + parquetSchema = + """message root { + | required group f0 { + | required int32 f00; + | required int64 f01; + | } + |} + """.stripMargin, + + catalystSchema = new StructType(), + + expectedSchema = "message root {}") + + testSchemaClipping( + "disjoint field sets", + + parquetSchema = + """message root { + | required group f0 { + | required int32 f00; + | required int64 f01; + | } + |} + """.stripMargin, + + catalystSchema = + new StructType() + .add( + "f0", + new StructType() + .add("f02", FloatType, nullable = true) + .add("f03", DoubleType, nullable = true), + nullable = true), + + expectedSchema = + """message root { + | required group f0 { + | optional float f02; + | optional double f03; + | } + |} + """.stripMargin) + + testSchemaClipping( + "parquet-avro style map", + + parquetSchema = + """message root { + | required group f0 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required int32 key; + | required group value { + | required int32 value_f0; + | required int64 value_f1; + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val valueType = + new StructType() + .add("value_f1", LongType, nullable = false) + .add("value_f2", DoubleType, nullable = false) + + val f0Type = MapType(IntegerType, valueType, valueContainsNull = false) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + """message root { + | required group f0 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required int32 key; + | required group value { + | required int64 value_f1; + | required double value_f2; + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "standard map", + + parquetSchema = + """message root { + | required group f0 (MAP) { + | repeated group key_value { + | required int32 key; + | required group value { + | required int32 value_f0; + | required int64 value_f1; + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val valueType = + new StructType() + .add("value_f1", LongType, nullable = false) + .add("value_f2", DoubleType, nullable = false) + + val f0Type = MapType(IntegerType, valueType, valueContainsNull = false) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + """message root { + | required group f0 (MAP) { + | repeated group key_value { + | required int32 key; + | required group value { + | required int64 value_f1; + | required double value_f2; + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "standard map with complex key", + + parquetSchema = + """message root { + | required group f0 (MAP) { + | repeated group key_value { + | required group key { + | required int32 value_f0; + | required int64 value_f1; + | } + | required int32 value; + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val keyType = + new StructType() + .add("value_f1", LongType, nullable = false) + .add("value_f2", DoubleType, nullable = false) + + val f0Type = MapType(keyType, IntegerType, valueContainsNull = false) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + """message root { + | required group f0 (MAP) { + | repeated group key_value { + | required group key { + | required int64 value_f1; + | required double value_f2; + | } + | required int32 value; + | } + | } + |} + """.stripMargin) } From d4b00c5c326e3082f9861acbd89909373b2541b3 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 8 Sep 2015 20:39:15 -0700 Subject: [PATCH 15/15] [SPARK-10071] [STREAMING] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream Output a warning when serializing QueueInputDStream rather than throwing an exception to allow unit tests use it. Moreover, this PR also throws an better exception when deserializing QueueInputDStream to make the user find out the problem easily. The previous exception is hard to understand: https://issues.apache.org/jira/browse/SPARK-8553 Author: zsxwing Closes #8624 from zsxwing/SPARK-10071 and squashes the following commits: 847cfa8 [zsxwing] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream (cherry picked from commit 820913f554bef610d07ca2dadaead657f916ae63) Signed-off-by: Tathagata Das --- .../apache/spark/streaming/Checkpoint.scala | 6 ++-- .../streaming/dstream/QueueInputDStream.scala | 9 ++++-- .../streaming/StreamingContextSuite.scala | 28 +++++++++++++------ 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 3985e1a3d9df..27024ecfd910 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -321,7 +321,7 @@ object CheckpointReader extends Logging { // Try to read the checkpoint files in the order logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) - val compressionCodec = CompressionCodec.createCodec(conf) + var readError: Exception = null checkpointFiles.foreach(file => { logInfo("Attempting to load checkpoint from file " + file) try { @@ -332,13 +332,15 @@ object CheckpointReader extends Logging { return Some(cp) } catch { case e: Exception => + readError = e logWarning("Error reading checkpoint from file " + file, e) } }) // If none of checkpoint files could be read, then throw exception if (!ignoreReadError) { - throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath") + throw new SparkException( + s"Failed to read checkpoint from directory $checkpointPath", readError) } None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index a2f5d82a79bd..bab78a3536b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import java.io.{NotSerializableException, ObjectOutputStream} +import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream} import scala.collection.mutable.{ArrayBuffer, Queue} import scala.reflect.ClassTag @@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag]( override def stop() { } + private def readObject(in: ObjectInputStream): Unit = { + throw new NotSerializableException("queueStream doesn't support checkpointing. " + + "Please don't use queueStream when checkpointing is enabled.") + } + private def writeObject(oos: ObjectOutputStream): Unit = { - throw new NotSerializableException("queueStream doesn't support checkpointing") + logWarning("queueStream doesn't support checkpointing") } override def compute(validTime: Time): Option[RDD[T]] = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 7423ef6bcb6e..d26894e88fc2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source import org.apache.spark.storage.StorageLevel @@ -726,16 +726,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo } test("queueStream doesn't support checkpointing") { - val checkpointDir = Utils.createTempDir() - ssc = new StreamingContext(master, appName, batchDuration) - val rdd = ssc.sparkContext.parallelize(1 to 10) - ssc.queueStream[Int](Queue(rdd)).print() - ssc.checkpoint(checkpointDir.getAbsolutePath) - val e = intercept[NotSerializableException] { - ssc.start() + val checkpointDirectory = Utils.createTempDir().getAbsolutePath() + def creatingFunction(): StreamingContext = { + val _ssc = new StreamingContext(conf, batchDuration) + val rdd = _ssc.sparkContext.parallelize(1 to 10) + _ssc.checkpoint(checkpointDirectory) + _ssc.queueStream[Int](Queue(rdd)).register() + _ssc + } + ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _) + ssc.start() + eventually(timeout(10000 millis)) { + assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1) + } + ssc.stop() + val e = intercept[SparkException] { + ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _) } // StreamingContext.validate changes the message, so use "contains" here - assert(e.getMessage.contains("queueStream doesn't support checkpointing")) + assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " + + "Please don't use queueStream when checkpointing is enabled.")) } def addInputStream(s: StreamingContext): DStream[Int] = {
Property NameDefaultMeaning
spark.streaming.backpressure.enabledfalse + Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). + This enables the Spark Streaming to control the receiving rate based on the + current batch scheduling delays and processing times so that the system receives + only as fast as the system can process. Internally, this dynamically sets the + maximum receiving rate of receivers. This rate is upper bounded by the values + `spark.streaming.receiver.maxRate` and `spark.streaming.kafka.maxRatePerPartition` + if they are set (see below). +
spark.streaming.blockInterval 200ms