Skip to content

Conversation

@onursatici
Copy link
Contributor

What changes were proposed in this pull request?

As of #21018, InMemoryRelation includes its cacheBuilder when logging query plans. This PR changes the string representation of the CachedRDDBuilder to not include the cached spark plan.

How was this patch tested?

spark-shell, query:

var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache()
0 to 1 foreach { _ =>
df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache()
}
df_cached.explain

as of master results in:

== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
+- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>

with this patch results in:

== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
   +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
         +- *(2) Project [A#10, B#11, B#35, B#87]
            +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
               :- *(2) Filter isnotnull(A#10)
               :  +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
               :        +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :              +- *(2) Project [A#10, B#11, B#35]
               :                 +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
               :                    :- *(2) Filter isnotnull(A#10)
               :                    :  +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
               :                    :        +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :                    :              +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
               :                    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
               :                       +- *(1) Filter isnotnull(A#34)
               :                          +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
               :                                +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
               :                                      +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
               +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
                  +- *(1) Filter isnotnull(A#86)
                     +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
                           +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
                                 +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>

@maropu
Copy link
Member

maropu commented Jul 19, 2018

Can you add tests in DatasetCacheSuite or somewhere? cc: @gatorsmile

@gatorsmile
Copy link
Member

ok to test


assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString))
assert(inMemoryRelation.simpleString.contains(
"CachedRDDBuilder(true, 1000, StorageLevel(memory, deserialized, 1 replicas))"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true and 1000 look confusing to end users. Can we improve it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we might not need the batch size in the plan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just comparing explain output results like the query in this pr description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile tried to keep this close to its default value, maybe we can do something like CachedRDDBuilder(useCompression = true, batchSize = 1000, ...)? But that will break the consistency across logging case classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu wouldn't that be testing the same thing, as explain calls plan.treeString which calls elem.simpleString for every child? I think testing for InMemoryRelation.simpleString covers other possible places where a plan.treeString is logged. Happy to change if you have concerns

Copy link
Member

@maropu maropu Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, but you don't need to fill useCompression and batchSize in the test case.

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93321 has finished for PR 21805 at commit 9ccfc4e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93341 has finished for PR 21805 at commit cf2eae2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
  • case class ExprId(id: Long, jvmId: UUID)
  • class InputProcessor(store: StateStore)
  • case class StateData(
  • sealed trait StateManager extends Serializable

tableName: Option[String])(
@transient private var _cachedColumnBuffers: RDD[CachedBatch] = null) {

override def toString: String = s"CachedRDDBuilder($useCompression, $batchSize, $storageLevel)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My major point is whether we need to output $useCompression, $batchSize. How useful are they? Our explain output is already pretty long. Maybe we can skip them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I think the output should be the same with one in v2.3;

scala> val df = Seq((1, 2), (3, 4)).toDF("a", "b")
scala> val testDf = df.join(df, "a").join(df, "a").cache
scala> testDf.groupBy("a").count().explain
== Physical Plan ==
*(2) HashAggregate(keys=[a#309], functions=[count(1)])
+- Exchange hashpartitioning(a#309, 200)
   +- *(1) HashAggregate(keys=[a#309], functions=[partial_count(1)])
      +- *(1) InMemoryTableScan [a#309]
            +- InMemoryRelation [a#309, b#310, b#314, b#319], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(3) Project [a#60, b#61, b#212, b#217]
                     +- *(3) BroadcastHashJoin [a#60], [a#216], Inner, BuildRight
                        :- *(3) Project [a#60, b#61, b#212]
                        :  +- *(3) BroadcastHashJoin [a#60], [a#211], Inner, BuildRight
                        :     :- *(3) InMemoryTableScan [a#60, b#61]
                        :     :     +- InMemoryRelation [a#60, b#61], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        :     :           +- LocalTableScan [a#15, b#16]
                        :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                        :        +- *(1) InMemoryTableScan [a#211, b#212]
                        :              +- InMemoryRelation [a#211, b#212], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        :                    +- LocalTableScan [a#15, b#16]
                        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                           +- *(2) InMemoryTableScan [a#216, b#217]
                                 +- InMemoryRelation [a#216, b#217], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- LocalTableScan [a#15, b#16]

The output of this current pr is still different, so can you fix that way? @onursatici

@onursatici
Copy link
Contributor Author

@gatorsmile @maropu
I have removed batchSize and useCompression, other than that the string representation is now the same as what we had on 2.3


override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)

override def simpleString: String = s"InMemoryRelation(${output}, ${cacheBuilder.storageLevel})"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about s"InMemoryRelation [${Utils.truncatedString(output, ", ")}], ${cacheBuilder.storageLevel}"?

assert(inMemoryRelation.simpleString ==
s"InMemoryRelation(${inMemoryRelation.output},"
+ " StorageLevel(memory, deserialized, 1 replicas))")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just comparing explain results?

    val df = Seq((1, 2)).toDF("a", "b").cache
    val outputStream = new java.io.ByteArrayOutputStream()
    Console.withOut(outputStream) {
      df.explain(false)
    }
    assert(outputStream.toString.replaceAll("#\\d+", "#x").contains(
      "InMemoryRelation [a#x, b#x], StorageLevel(disk, memory, deserialized, 1 replicas)"))

override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)

override def simpleString: String = s"InMemoryRelation(${output}, ${cacheBuilder.storageLevel})"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the blank line

@maropu
Copy link
Member

maropu commented Jul 23, 2018

LGTM

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93443 has finished for PR 21805 at commit de3f63e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93444 has finished for PR 21805 at commit 2a21c80.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

Thanks! Merged to master

@asfgit asfgit closed this in 2edf17e Jul 23, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants