Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add AnalysisBarrier
  • Loading branch information
gengliangwang committed Dec 29, 2017
commit bdffa6d42c7912a4ed63a64e756c7bf66eb0a274
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ trait DataWritingCommand extends Command {
* The input query plan that produces the data to be written.
* IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns
* to [[FileFormatWriter]].
* For performance consideration, it is suggested to construct [[DataWritingCommand]]
* with its input query as [[AnalysisBarrier]] to avoid the query being analyzed again.
*/
def query: LogicalPlan

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, LogicalPlan}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
Expand Down Expand Up @@ -472,7 +472,7 @@ case class DataSource(
bucketSpec = bucketSpec,
fileFormat = format,
options = options,
query = data,
query = AnalysisBarrier(data),
mode = mode,
catalogTable = catalogTable,
fileIndex = fileIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter => _, _}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
Expand Down Expand Up @@ -205,7 +205,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
t.bucketSpec,
t.fileFormat,
t.options,
actualQuery,
AnalysisBarrier(actualQuery),
mode,
table,
Some(t.location),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan,
ScriptTransformation}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
Expand Down Expand Up @@ -148,7 +147,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
InsertIntoHiveTable(r.tableMeta, partSpec, AnalysisBarrier(query), overwrite,
ifPartitionNotExists, query.output)

case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
Expand All @@ -164,7 +163,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
val outputPath = new Path(storage.locationUri.get)
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)

InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output)
InsertIntoHiveDirCommand(isLocal, storage, AnalysisBarrier(child), overwrite, child.output)
}
}

Expand Down