Skip to content

Conversation

@jamisonbennett
Copy link
Contributor

@jamisonbennett jamisonbennett commented Dec 28, 2018

What changes were proposed in this pull request?

Allow multiple spark.sql.extensions to be specified in the
configuration.

How was this patch tested?

New tests are added.

## What changes were proposed in this pull request?

Allow multiple spark.sql.extensions to be specified in the
configuration.

## How was this patch tested?

New tests are added.
@jamisonbennett
Copy link
Contributor Author

This is my original work and I license the work to the project under the project’s open source license.
I have signed an ICLA with apache.org.

@jamisonbennett
Copy link
Contributor Author

@sameeragarwal , @gatorsmile , @RussellSpitzer please review my changes.

@gatorsmile
Copy link
Member

ok to test

@RussellSpitzer
Copy link
Member

Lgtm +1

@RussellSpitzer
Copy link
Member

Actually how does this work from a spark defaults perspective, does a comma separated string work?

@gatorsmile
Copy link
Member

@RussellSpitzer See this example.

    val conf = new SparkConf()
    val seq = ConfigBuilder(testKey("seq")).stringConf.toSequence.createWithDefault(Seq())
    conf.set(seq.key, "1,,2, 3 , , 4")
    assert(conf.get(seq) === Seq("1", "2", "3", "4"))
    conf.set(seq, Seq("1", "2"))
    assert(conf.get(seq) === Seq("1", "2"))

@dongjoon-hyun
Copy link
Member

This PR looks useful. Thank you for your first contribution, @jamisonbennett .

assert(session.sessionState.functionRegistry
.lookupFunction(MyExtensions.myFunction._1).isDefined)
assert(session.sessionState.functionRegistry
.lookupFunction(MyExtensions2.myFunction._1).isDefined)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, now we have multiple extension registrations. The order of extension names might have side-effects.

  • Can we have a test case for duplicated extension names? MyExtension2 and MyExtension2?
  • Can we have a negative test case for function name conflicts? MyExtension2.myFunction and MyExtension3.myFunction?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the order matters, but we need to discuss and document the behavior when we have name conflicts.

For example, the same rule will be added twice in extendedResolutionRules. Is it desired?

class MyExtensions extends (SparkSessionExtensions => Unit) {
  def apply(e: SparkSessionExtensions): Unit = {
    e.injectResolutionRule(MyRule)
  }
}

class MyExtensions2 extends (SparkSessionExtensions => Unit) {
  def apply(e: SparkSessionExtensions): Unit = {
    e.injectResolutionRule(MyRule)
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. If there is no reason to allow that, we had better disallow that by design before this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are use cases where you want to execute rules in a certain order. So I think it is reasonable to add the same rule multiple times. If you want more control you could even create 'micro' optimizer batches by calling multiple rules from one rule.

I think this is more a matter of proper documentation than one where we should explicitly block things. Also note that this is a pretty advanced feature and by this stage users are expected to know what they are doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this change, it was possible to programmatically register multiple extensions but it was not possible to do so through the spark.sql.extensions configuration. Although it wasn't documented/tested until this pull request. E.g. The following works without this pull request:

SparkSession.builder()
  .master("..")
  .withExtensions(sparkSessionExtensions1)
  .withExtensions(sparkSessionExtensions2)
  .getOrCreate()

So I think conflicting function names are already currently possible (but not documented). In the following cases:

  1. Conflicting function names are registered by calling .withExtenions() multiple times
  2. An extension accidentally registers a function that was already registered with the builtin functions
  3. An extension accidentally registers a function multiple times by calling injectFunction(myFunction)

As for the order, it looks to me like the last function to be stored with conflicting names is the one which is retrieved:

class SimpleFunctionRegistry extends FunctionRegistry {

  @GuardedBy("this")
  private val functionBuilders =
    new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
  override def registerFunction(
      name: FunctionIdentifier,
      info: ExpressionInfo,
      builder: FunctionBuilder): Unit = synchronized {
    functionBuilders.put(normalizeFuncName(name), (info, builder))
  }

I will update this PR to document what happens in order of operations and conflicts. If we need to explicitly block duplicates functions from being registered, I can temporarily drop this PR and see about making those changes first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining it. We do not need to block it, but we might need to detect and throw a warning message at least.

More importantly, we need to document the current behavior and also add a test case to ensure the future changes will not break it. In the future, we can revisit the current behavior and make a change if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a test case for duplicated extension names? Done
Can we have a negative test case for function name conflicts? MyExtension2.myFunction and MyExtension3.myFunction? Done

I added documentation for the behavior.
I added a warning message if a registered function is replaced.
I added a test case for the ordering.


val SPARK_SESSION_EXTENSIONS = buildStaticConf("spark.sql.extensions")
.doc("Name of the class used to configure Spark Session extensions. The class should " +
.doc("List of the class names used to configure Spark Session extensions. The classes should " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document in what order rules from multiple extensions are executed, how listeners are invoked and how functions are registered (last one wins?). Be sure to cover what happens if you add duplicate listeners/rules/functions etc...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A suggestion of update the comment is replace 'List of the class names used to configure Spark Session extensions. The classes should implement Function1[SparkSessionExtension, Unit], and must have a no-args constructor.' to 'A comma-separated list of classes that implement Function1[SparkSessionExtension, Unit] used to configure Spark Session extensions. The classes must have a no-args constructor.'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the documentation as suggested with respect to the ordering. I think the comment about "listeners" is related to code that is close in proximity to the code for this pull request, but it is a different configuration item. So I didn't update the spark.sql.queryExecutionListeners documentation. If you think that documentation should be updated, let me know if I should include it as a part of this pull request or as a separate pull request.

@SparkQA
Copy link

SparkQA commented Dec 28, 2018

Test build #100511 has finished for PR 23398 at commit cef8eb0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"List of the class names used to configure Spark Session extensions. The classes should \" +

@SparkQA
Copy link

SparkQA commented Dec 30, 2018

Test build #100544 has finished for PR 23398 at commit 689a4d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SimpleFunctionRegistry extends FunctionRegistry with Logging

builder: FunctionBuilder): Unit = synchronized {
functionBuilders.put(normalizeFuncName(name), (info, builder))
val normalizedName = normalizeFuncName(name)
if (functionBuilders.put(normalizedName, (info, builder)).isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we can check if the new function and the old function are different. This will help to increase the signal of the error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check which which will only log if different function objects are registered. The "allow an extension to be duplicated" unit tests that I previously added registers the same object twice. This test no longer prints the warning. The "use the last registered function name when there are duplicates" unit tests that I previously added registers different functions with the same name. This test prints the warning.

This addresses the comments for apache#23398
@SparkQA
Copy link

SparkQA commented Jan 2, 2019

Test build #100643 has finished for PR 23398 at commit d89cfd9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bersprockets
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jan 2, 2019

Test build #100647 has finished for PR 23398 at commit d89cfd9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay. It's unstable + experimental FWIW.

@felixcheung
Copy link
Member

@hvanhovell @gatorsmile ?

This addresses the comments for apache#23398
@SparkQA
Copy link

SparkQA commented Jan 5, 2019

Test build #100800 has finished for PR 23398 at commit fb4ad34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assert(session.sessionState.analyzer.extendedCheckRules.containsSlice(orderedCheckRules))
assert(session.sessionState.optimizer.batches.flatMap(_.rules).filter(orderedRules.contains)
.containsSlice(orderedRules ++ orderedRules)) // The optimizer rules are duplicated
assert(session.sessionState.sqlParser == parser)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all these asserts, use === and !==.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually arguable, Vanzin. Some people prefer === whereas some prefer ==. === doesn't look always reporting a better error message give my tests. See also databricks/scala-style-guide#36.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on databricks/scala-style-guide#36, it looks like == might now be preferred over ===. For what its worth, it seems that in the cases for this test == produces reasonable error messages such as MyParser(org.apache.spark.sql.SparkSession@6e8a9c30,org.apache.spark.sql.catalyst.parser.CatalystSqlParser$@5d01ea21) did not equal IntentionalErrorThatIInsertedHere and 2 did not equal 3. So please let me know if there is newer guidance to use === and I can make the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

== and === are not equivalent, and we use === in tests. The latter, for example, handles arrays correctly, which the former does not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow up, I was not aware of that difference. I updated the tests to use === and !== as originally recommended.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin, where does it say we use ===? If it's practically yes, let's document it.

functionBuilders.put(normalizedName, newFunction) match {
case Some(previousFunction) if previousFunction != newFunction =>
logWarning(s"The function $normalizedName replaced a previously registered function.")
case _ => Unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is returning Unit type object. This can be just removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a change here which I think was what you were recommending. I didn't want to remove the case _ => otherwise I think it may result in a non-exhaustive match exception. If you wanted me to remove the entire match statement, just let me know.

new ExpressionInfo("noClass", "myDb", "myFunction", "usage", "extended usage" ),
new ExpressionInfo("noClass", "myDb", "myFunction", "usage", "extended usage"),
(myArgs: Seq[Expression]) => Literal(5, IntegerType))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this newline. Looks unrelated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

e.injectPostHocResolutionRule(MyRule2)
e.injectCheckRule(MyCheckRule2)
e.injectOptimizerRule(MyRule2)
e.injectParser((_, _) => CatalystSqlParser)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: e.injectParser((_: SparkSession, _: ParserInterface) => CatalystSqlParser)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also made the suggested change in 2 other places in this file so that it is consistent.

val myFunction = (FunctionIdentifier("myFunction"),
new ExpressionInfo("noClass", "myDb", "myFunction", "usage", "extended usage" ),
new ExpressionInfo("noClass", "myDb", "myFunction", "usage", "extended usage"),
(myArgs: Seq[Expression]) => Literal(5, IntegerType))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (_: Seq[Expression]) => Literal(5, IntegerType))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

object MyExtensions2Duplicate {

val myFunction = (FunctionIdentifier("myFunction2"),
new ExpressionInfo("noClass", "myDb", "myFunction2", "usage", "last wins" ),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit "last wins" -> "extended usage"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: " ) -> ")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made both changes and also updated one of the tests to validate the ExpressionInfo object rather than the extended usage text.

object MyExtensions2 {

val myFunction = (FunctionIdentifier("myFunction2"),
new ExpressionInfo("noClass", "myDb", "myFunction2", "usage", "extended usage" ),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: " ) -> ")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* }}}
*
* The extensions can also be used by setting the Spark SQL configuration property
* spark.sql.extensions, for example:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spark.sql.extensions -> 'spark.sql.extensions'

Shall we also mention multiple rule can be set via comma separated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the recommended documentation updates and also added code marks around withExtensions and fixed the typo there. I didn't think it was necessary to provide the example of using the comma-separated string but I did note it in the documentation.

This addresses the comments for apache#23398
@SparkQA
Copy link

SparkQA commented Jan 9, 2019

Test build #100971 has finished for PR 23398 at commit 9c0181d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 9, 2019

Test build #100968 has finished for PR 23398 at commit 65a5f3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 9, 2019

Test build #100977 has finished for PR 23398 at commit 9c0181d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

This addresses the comments for apache#23398
@SparkQA
Copy link

SparkQA commented Jan 9, 2019

Test build #100986 has finished for PR 23398 at commit deaf73e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

For === vs ==, if that's not documented, let's document in databricks/scala-style-guide which is an official style guide to refer. If that's not documented, I think both are okay. Both are already being used here and there.

@asfgit asfgit closed this in 1a47233 Jan 10, 2019
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

Allow multiple spark.sql.extensions to be specified in the
configuration.

## How was this patch tested?

New tests are added.

Closes apache#23398 from jamisonbennett/SPARK-26493.

Authored-by: Jamison Bennett <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
AFFogarty pushed a commit to AFFogarty/spark-1 that referenced this pull request Jun 6, 2020
Allow multiple spark.sql.extensions to be specified in the
configuration.

New tests are added.

Closes apache#23398 from jamisonbennett/SPARK-26493.

Authored-by: Jamison Bennett <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.