Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ case class WithWindowDefinition(
override def output: Seq[Attribute] = child.output
}

case class WriteToFile(
path: String,
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
Expand Down
9 changes: 2 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ class DataFrame private[sql](
// happen right away to let these side effects take place eagerly.
case _: Command |
_: InsertIntoTable |
_: CreateTableUsingAsSelect |
_: WriteToFile =>
_: CreateTableUsingAsSelect =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
queryExecution.analyzed
Expand Down Expand Up @@ -1615,11 +1614,7 @@ class DataFrame private[sql](
*/
@deprecated("Use write.parquet(path)", "1.4.0")
def saveAsParquetFile(path: String): Unit = {
if (sqlContext.conf.parquetUseDataSourceApi) {
write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import java.util.Properties

import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, Partition}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}

/**
* :: Experimental ::
Expand Down Expand Up @@ -259,7 +259,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}.toArray

sqlContext.baseRelationToDataFrame(
new ParquetRelation2(
new ParquetRelation(
globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))
}
}
Expand Down
6 changes: 0 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,6 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "Enables Parquet filter push-down optimization when set to true.")

val PARQUET_USE_DATA_SOURCE_API = booleanConf("spark.sql.parquet.useDataSourceApi",
defaultValue = Some(true),
doc = "<TODO>")

val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
key = "spark.sql.parquet.followParquetFormatSpec",
defaultValue = Some(false),
Expand Down Expand Up @@ -456,8 +452,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)

private[spark] def parquetUseDataSourceApi: Boolean = getConf(PARQUET_USE_DATA_SOURCE_API)

private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
LeftSemiJoin ::
HashJoin ::
InMemoryScans ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)
Expand Down Expand Up @@ -1115,11 +1114,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
def parquetFile(paths: String*): DataFrame = {
if (paths.isEmpty) {
emptyDataFrame
} else if (conf.parquetUseDataSourceApi) {
read.parquet(paths : _*)
} else {
DataFrame(this, parquet.ParquetRelation(
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
read.parquet(paths : _*)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SQLContext, Strategy, execution}

private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
Expand Down Expand Up @@ -306,57 +305,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

object ParquetOperations extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// TODO: need to support writing to other types of files. Unify the below code paths.
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil
case logical.InsertIntoTable(
table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val partitionColNames = relation.partitioningAttributes.map(_.name).toSet
val filtersToPush = filters.filter { pred =>
val referencedColNames = pred.references.map(_.name).toSet
referencedColNames.intersect(partitionColNames).isEmpty
}
val prunePushedDownFilters =
if (sqlContext.conf.parquetFilterPushDown) {
(predicates: Seq[Expression]) => {
// Note: filters cannot be pushed down to Parquet if they contain more complex
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
// filters that have been pushed down. Note that a predicate such as "(A AND B) OR C"
// can result in "A OR C" being pushed down. Here we are conservative in the sense
// that even if "A" was pushed and we check for "A AND B" we still want to keep
// "A AND B" in the higher-level filter, not just "B".
predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
case (predicate, None) => predicate
// Filter needs to be applied above when it contains partitioning
// columns
case (predicate, _)
if !predicate.references.map(_.name).toSet.intersect(partitionColNames).isEmpty =>
predicate
}
}
} else {
identity[Seq[Expression]] _
}
pruneFilterProject(
projectList,
filters,
prunePushedDownFilters,
ParquetTableScan(
_,
relation,
if (sqlContext.conf.parquetFilterPushDown) filtersToPush else Nil)) :: Nil

case _ => Nil
}
}

object InMemoryScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.parquet

import java.util.{Map => JMap}

import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap}

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.MessageType

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType

private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")

val toCatalyst = new CatalystSchemaConverter(conf)
val parquetRequestedSchema = readContext.getRequestedSchema

val catalystRequestedSchema =
Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
metadata
// First tries to read requested schema, which may result from projections
.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
// If not available, tries to read Catalyst schema from file metadata. It's only
// available if the target file is written by Spark SQL.
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
logDebug("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}

logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
}

override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration

// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its the metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))

// Optional schema of requested columns, in the form of a string serialized from a Catalyst
// `StructType` containing all requested columns.
val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))

// Below we construct a Parquet schema containing all requested columns. This schema tells
// Parquet which columns to read.
//
// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
// we have to fallback to the full file schema which contains all columns in the file.
// Obviously this may waste IO bandwidth since it may read more columns than requested.
//
// Two things to note:
//
// 1. It's possible that some requested columns don't exist in the target Parquet file. For
// example, in the case of schema merging, the globally merged schema may contain extra
// columns gathered from other Parquet files. These columns will be simply filled with nulls
// when actually reading the target Parquet file.
//
// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
// Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
// non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
// containing a single integer array field `f1` may have the following legacy 2-level
// structure:
//
// message root {
// optional group f1 (LIST) {
// required INT32 element;
// }
// }
//
// while `CatalystSchemaConverter` may generate a standard 3-level structure:
//
// message root {
// optional group f1 (LIST) {
// repeated group list {
// required INT32 element;
// }
// }
// }
//
// Apparently, we can't use the 2nd schema to read the target Parquet file as they have
// different physical structures.
val parquetRequestedSchema =
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
val toParquet = new CatalystSchemaConverter(conf)
val fileSchema = context.getFileSchema.asGroupType()
val fileFieldNames = fileSchema.getFields.map(_.getName).toSet

StructType
// Deserializes the Catalyst schema of requested columns
.fromString(schemaString)
.map { field =>
if (fileFieldNames.contains(field.name)) {
// If the field exists in the target Parquet file, extracts the field type from the
// full file schema and makes a single-field Parquet schema
new MessageType("root", fileSchema.getType(field.name))
} else {
// Otherwise, just resorts to `CatalystSchemaConverter`
toParquet.convert(StructType(Array(field)))
}
}
// Merges all single-field Parquet schemas to form a complete schema for all requested
// columns. Note that it's possible that no columns are requested at all (e.g., count
// some partition column of a partitioned Parquet table). That's why `fold` is used here
// and always fallback to an empty Parquet schema.
.fold(new MessageType("root")) {
_ union _
}
}

val metadata =
Map.empty[String, String] ++
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)

logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
}
}

private[parquet] object CatalystReadSupport {
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"

val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.parquet

import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType

/**
* A [[RecordMaterializer]] for Catalyst rows.
*
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
*/
private[parquet] class CatalystRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType)
extends RecordMaterializer[InternalRow] {

private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRow

override def getRootConverter: GroupConverter = rootConverter
}
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,11 @@ private[parquet] object CatalystSchemaConverter {
""".stripMargin.split("\n").mkString(" "))
}

def checkFieldNames(schema: StructType): StructType = {
schema.fieldNames.foreach(checkFieldName)
schema
}

def analysisRequire(f: => Boolean, message: String): Unit = {
if (!f) {
throw new AnalysisException(message)
Expand Down
Loading