Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private[kafka010] class KafkaSource(
currentPartitionOffsets = Some(untilPartitionOffsets)
}

sqlContext.internalCreateDataFrame(rdd, schema)
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
}

/** Stop this source and free any resources it has allocated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,14 +1175,14 @@ object DecimalAggregates extends Rule[LogicalPlan] {
*/
object ConvertToLocalRelation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Project(projectList, LocalRelation(output, data))
case Project(projectList, LocalRelation(output, data, isStreaming))
if !projectList.exists(hasUnevaluableExpr) =>
val projection = new InterpretedProjection(projectList, output)
projection.initialize(0)
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
LocalRelation(projectList.map(_.toAttribute), data.map(projection), isStreaming)

case Limit(IntegerLiteral(limit), LocalRelation(output, data)) =>
LocalRelation(output, data.take(limit))
case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)
}

private def hasUnevaluableExpr(expr: Expression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ object LocalRelation {
}
}

case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
case class LocalRelation(output: Seq[Attribute],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs to explain what isStreaming is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. (I think this is a correct summary?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure this is same as the updated isStreaming docs (see my other comments)

data: Seq[InternalRow] = Nil,
// Indicates whether this relation came from a streaming source.
override val isStreaming: Boolean = false)
extends LeafNode with analysis.MultiInstanceRelation {

// A local relation must have resolved output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,10 @@ case class Sort(

/** Factory for constructing new `Range` nodes. */
object Range {
def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): Range = {
def apply(start: Long, end: Long, step: Long,
numSlices: Option[Int], isStreaming: Boolean = false): Range = {
val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
new Range(start, end, step, numSlices, output)
new Range(start, end, step, numSlices, output, isStreaming)
}
def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
Range(start, end, step, Some(numSlices))
Expand All @@ -443,7 +444,8 @@ case class Range(
end: Long,
step: Long,
numSlices: Option[Int],
output: Seq[Attribute])
output: Seq[Attribute],
override val isStreaming: Boolean)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can a Range have data from a streaming source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's necessarily a reason it shouldn't be able to; streaming sources are free to define getBatch() however they'd like.

Right now the only source actually doing that is a fake source in StreamSuite.

extends LeafNode with MultiInstanceRelation {

require(step != 0, s"step ($step) cannot be 0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
val table = UnresolvedInlineTable(Seq("c1"),
Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
val withTimeZone = ResolveTimeZone(conf).apply(table)
val LocalRelation(output, data) = ResolveInlineTables(conf).apply(withTimeZone)
val LocalRelation(output, data, _) = ResolveInlineTables(conf).apply(withTimeZone)
val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
.withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
assert(output.map(_.dataType) == Seq(TimestampType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ class LogicalPlanSuite extends SparkFunSuite {

test("isStreaming") {
val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
val incrementalRelation = new LocalRelation(
Seq(AttributeReference("a", IntegerType, nullable = true)())) {
override def isStreaming(): Boolean = true
}
val incrementalRelation = LocalRelation(
Seq(AttributeReference("a", IntegerType, nullable = true)()), isStreaming = true)

case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

Dataset.ofRows(
sparkSession,
LogicalRDD(schema.toAttributes, parsed)(sparkSession))
LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession))
}

/**
Expand Down Expand Up @@ -473,7 +473,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

Dataset.ofRows(
sparkSession,
LogicalRDD(schema.toAttributes, parsed)(sparkSession))
LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case (true, SaveMode.Overwrite) =>
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
case LogicalRelation(src: BaseRelation, _, _, _) => src
case relation: HiveTableRelation => relation.tableMeta.identifier
}

val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
EliminateSubqueryAliases(tableRelation) match {
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
Expand Down
5 changes: 3 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ class Dataset[T] private[sql](
logicalPlan.output,
internalRdd,
outputPartitioning,
physicalPlan.outputOrdering
physicalPlan.outputOrdering,
isStreaming
)(sparkSession)).as[T]
}

Expand Down Expand Up @@ -2993,7 +2994,7 @@ class Dataset[T] private[sql](
*/
def inputFiles: Array[String] = {
val files: Seq[String] = queryExecution.optimizedPlan.collect {
case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
Expand Down
6 changes: 4 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,10 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* converted to Catalyst rows.
*/
private[sql]
def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = {
sparkSession.internalCreateDataFrame(catalystRows, schema)
def internalCreateDataFrame(catalystRows: RDD[InternalRow],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The correct code style for multiline param definition is

def function(
    param1: type1,      // double indent, i.e. 4 spaces
    param2: type2)    

See the indentation section in http://spark.apache.org/contributing.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

schema: StructType,
isStreaming: Boolean = false) = {
sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,14 @@ class SparkSession private(
*/
private[sql] def internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType): DataFrame = {
schema: StructType,
isStreaming: Boolean = false): DataFrame = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
val logicalPlan = LogicalRDD(
schema.toAttributes,
catalystRows,
isStreaming = isStreaming)(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just isStreaming is fine. isStreaming = isStreaming is overkill. Its only useful when the value is a constant. E.g. isStreaming = true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary here because there are two other default arguments in the constructor.

Dataset.ofRows(self, logicalPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ case class LogicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
outputPartitioning: Partitioning = UnknownPartitioning(0),
outputOrdering: Seq[SortOrder] = Nil)(session: SparkSession)
outputOrdering: Seq[SortOrder] = Nil,
override val isStreaming: Boolean = false)(session: SparkSession)
extends LeafNode with MultiInstanceRelation {

override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
Expand All @@ -150,11 +151,12 @@ case class LogicalRDD(
output.map(rewrite),
rdd,
rewrittenPartitioning,
rewrittenOrdering
rewrittenOrdering,
isStreaming
)(session).asInstanceOf[this.type]
}

override protected def stringArgs: Iterator[Any] = Iterator(output)
override protected def stringArgs: Iterator[Any] = Iterator(output, isStreaming)

override def computeStats(): Statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
child transform {
case plan if plan eq relation =>
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
val partitionData = fsRelation.location.listFiles(Nil, Nil)
LocalRelation(partAttrs, partitionData.map(_.values))
LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)

case relation: HiveTableRelation =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Expand Down Expand Up @@ -130,7 +130,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
object PartitionedRelation {

def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _)
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
if fsRelation.partitionSchema.nonEmpty =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some((AttributeSet(partAttrs), l))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
case logical.LocalRelation(output, data, _) =>
LocalTableScanExec(output, data) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ case class DataSource(

val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
}.head
}
// For partitioned relation r, r.schema's column ordering can be different from the column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)

case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _),
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case i @ InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) =>
l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
// to include those constant column values in the query result.
Expand Down Expand Up @@ -177,7 +177,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast

val outputPath = t.location.rootPaths.head
val inputPaths = actualQuery.collect {
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
}.flatten

val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
Expand Down Expand Up @@ -268,29 +268,30 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
import DataSourceStrategy._

def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
case PhysicalOperation(projects, filters,
l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil

case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
case l @ LogicalRelation(baseRelation: TableScan, _, _, _) =>
RowDataSourceScanExec(
l.output,
l.output.indices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.sql.execution.SparkPlan
object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) =>
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import org.apache.spark.util.Utils
case class LogicalRelation(
relation: BaseRelation,
output: Seq[AttributeReference],
catalogTable: Option[CatalogTable])
catalogTable: Option[CatalogTable],
override val isStreaming: Boolean)
extends LeafNode with MultiInstanceRelation {

// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any): Boolean = other match {
case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
case l @ LogicalRelation(otherRelation, _, _, isStreaming) =>
relation == otherRelation && output == l.output && isStreaming == l.isStreaming
case _ => false
}

Expand Down Expand Up @@ -76,9 +78,9 @@ case class LogicalRelation(
}

object LogicalRelation {
def apply(relation: BaseRelation): LogicalRelation =
LogicalRelation(relation, relation.schema.toAttributes, None)
def apply(relation: BaseRelation, isStreaming: Boolean = false): LogicalRelation =
LogicalRelation(relation, relation.schema.toAttributes, None, isStreaming)

def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
LogicalRelation(relation, relation.schema.toAttributes, Some(table))
LogicalRelation(relation, relation.schema.toAttributes, Some(table), false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
_,
_),
_,
_,
_))
if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
// The attribute name of predicate could be different than the one in schema in case of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,10 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
case relation: HiveTableRelation =>
val metadata = relation.tableMeta
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
case LogicalRelation(_: InsertableRelation, _, catalogTable, _) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
case _ => i
Expand Down Expand Up @@ -428,7 +428,7 @@ object PreReadCheck extends (LogicalPlan => Unit) {
private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
operator match {
case _: HiveTableRelation => 1
case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
case _ @ LogicalRelation(_: HadoopFsRelation, _, _, _) => 1
case _: LeafNode => 0
// UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.
case u: Union =>
Expand All @@ -454,10 +454,10 @@ object PreWriteCheck extends (LogicalPlan => Unit) {

def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case InsertIntoTable(l @ LogicalRelation(relation, _, _), partition, query, _, _) =>
case InsertIntoTable(l @ LogicalRelation(relation, _, _, _), partition, query, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src, _, _) => src
case LogicalRelation(src, _, _, _) => src
}
if (srcRelations.contains(relation)) {
failAnalysis("Cannot insert into table that is also being read from.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class FileStreamSource(
className = fileFormatClassName,
options = optionsWithPartitionBasePath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkFilesExist = false)))
checkFilesExist = false), isStreaming = true))
}

/**
Expand Down
Loading