-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11970] [SQL] Adding JoinType into JoinWith and support Sample in Dataset API #9921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
118b0bc
50f5320
980b153
d21b0bc
0d62b5e
458cf67
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,16 +20,16 @@ package org.apache.spark.sql | |
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark.annotation.Experimental | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.api.java.function._ | ||
|
|
||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.encoders._ | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias | ||
| import org.apache.spark.sql.catalyst.plans.Inner | ||
| import org.apache.spark.sql.catalyst.plans.JoinType | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.execution.{Queryable, QueryExecution} | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| /** | ||
| * :: Experimental :: | ||
|
|
@@ -83,7 +83,6 @@ class Dataset[T] private[sql]( | |
|
|
||
| /** | ||
| * Returns the schema of the encoded form of the objects in this [[Dataset]]. | ||
| * | ||
| * @since 1.6.0 | ||
| */ | ||
| def schema: StructType = resolvedTEncoder.schema | ||
|
|
@@ -185,7 +184,6 @@ class Dataset[T] private[sql]( | |
| * .transform(featurize) | ||
| * .transform(...) | ||
| * }}} | ||
| * | ||
| * @since 1.6.0 | ||
| */ | ||
| def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this) | ||
|
|
@@ -453,6 +451,22 @@ class Dataset[T] private[sql]( | |
| c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] = | ||
| selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, U5)]] | ||
|
|
||
|
|
||
| /** | ||
| * Returns a new [[Dataset]] by sampling a fraction of rows. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rows -> records |
||
| * @since 1.6.0 | ||
| */ | ||
| def sample(withReplacement: Boolean, fraction: Double, seed: Long) : Dataset[T] = | ||
| withPlan(Sample(0.0, fraction, withReplacement, seed, _)) | ||
|
|
||
| /** | ||
| * Returns a new [[Dataset]] by sampling a fraction of rows, using a random seed. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rows -> records |
||
| * @since 1.6.0 | ||
| */ | ||
| def sample(withReplacement: Boolean, fraction: Double) : Dataset[T] = { | ||
| sample(withReplacement, fraction, Utils.random.nextLong) | ||
| } | ||
|
|
||
| /* **************** * | ||
| * Set operations * | ||
| * **************** */ | ||
|
|
@@ -511,13 +525,17 @@ class Dataset[T] private[sql]( | |
| * types as well as working with relational data where either side of the join has column | ||
| * names in common. | ||
| * | ||
| * @param other Right side of the join. | ||
| * @param condition Join expression. | ||
| * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. | ||
| * @since 1.6.0 | ||
| */ | ||
| def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = { | ||
| def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you need to update the documentation to explain what options are available for joinType. |
||
| val left = this.logicalPlan | ||
| val right = other.logicalPlan | ||
|
|
||
| val joined = sqlContext.executePlan(Join(left, right, Inner, Some(condition.expr))) | ||
| val joined = sqlContext.executePlan(Join(left, right, joinType = | ||
| JoinType(joinType), Some(condition.expr))) | ||
| val leftOutput = joined.analyzed.output.take(left.output.length) | ||
| val rightOutput = joined.analyzed.output.takeRight(right.output.length) | ||
|
|
||
|
|
@@ -540,6 +558,18 @@ class Dataset[T] private[sql]( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair | ||
| * where `condition` evaluates to true | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missed a period |
||
| * | ||
| * @param other Right side of the join. | ||
| * @param condition Join expression. | ||
| * @since 1.6.0 | ||
| */ | ||
| def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = { | ||
| joinWith(other, condition, "inner") | ||
| } | ||
|
|
||
| /* ************************** * | ||
| * Gather to Driver Actions * | ||
| * ************************** */ | ||
|
|
@@ -584,7 +614,6 @@ class Dataset[T] private[sql]( | |
| * | ||
| * Running take requires moving data into the application's driver process, and doing so with | ||
| * a very large `n` can crash the driver process with OutOfMemoryError. | ||
| * | ||
| * @since 1.6.0 | ||
| */ | ||
| def take(num: Int): Array[T] = withPlan(Limit(Literal(num), _)).collect() | ||
|
|
@@ -594,7 +623,6 @@ class Dataset[T] private[sql]( | |
| * | ||
| * Running take requires moving data into the application's driver process, and doing so with | ||
| * a very large `n` can crash the driver process with OutOfMemoryError. | ||
| * | ||
| * @since 1.6.0 | ||
| */ | ||
| def takeAsList(num: Int): java.util.List[T] = java.util.Arrays.asList(take(num) : _*) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -185,17 +185,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext { | |
| val ds2 = Seq(1, 2).toDS().as("b") | ||
|
|
||
| checkAnswer( | ||
| ds1.joinWith(ds2, $"a.value" === $"b.value"), | ||
| ds1.joinWith(ds2, $"a.value" === $"b.value", "inner"), | ||
| (1, 1), (2, 2)) | ||
| } | ||
|
|
||
| test("joinWith, expression condition") { | ||
| val ds1 = Seq(ClassData("a", 1), ClassData("b", 2)).toDS() | ||
| val ds2 = Seq(("a", 1), ("b", 2)).toDS() | ||
| test("joinWith, expression condition, outer join") { | ||
| val nullInteger = null.asInstanceOf[Integer] | ||
| val nullString = null.asInstanceOf[String] | ||
| val ds1 = Seq(ClassNullableData("a", new Integer(1)), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we can just pass in |
||
| ClassNullableData("c", new Integer(3))).toDS() | ||
| val ds2 = Seq(("a", new Integer(1)), | ||
| ("b", new Integer(2))).toDS() | ||
|
|
||
| checkAnswer( | ||
| ds1.joinWith(ds2, $"_1" === $"a"), | ||
| (ClassData("a", 1), ("a", 1)), (ClassData("b", 2), ("b", 2))) | ||
| ds1.joinWith(ds2, $"_1" === $"a", "outer"), | ||
| (ClassNullableData("a", new Integer(1)), ("a", new Integer(1))), | ||
| (ClassNullableData("c", new Integer(3)), (nullString, nullInteger)), | ||
| (ClassNullableData(nullString, nullInteger), ("b", new Integer(2)))) | ||
| } | ||
|
|
||
| test("joinWith tuple with primitive, expression") { | ||
|
|
@@ -225,7 +231,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { | |
| ds1.joinWith(ds2, $"a._2" === $"b._2").as("ab").joinWith(ds3, $"ab._1._2" === $"c._2"), | ||
| ((("a", 1), ("a", 1)), ("a", 1)), | ||
| ((("b", 2), ("b", 2)), ("b", 2))) | ||
|
|
||
| } | ||
|
|
||
| test("groupBy function, keys") { | ||
|
|
@@ -367,6 +372,22 @@ class DatasetSuite extends QueryTest with SharedSQLContext { | |
| 1 -> "a", 2 -> "bc", 3 -> "d") | ||
| } | ||
|
|
||
| test("sample with replacement") { | ||
| val n = 100 | ||
| val data = sparkContext.parallelize(1 to n, 2).toDS() | ||
| checkAnswer( | ||
| data.sample(withReplacement = true, 0.05, seed = 13), | ||
| Seq(5, 10, 52, 73): _*) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of |
||
| } | ||
|
|
||
| test("sample without replacement") { | ||
| val n = 100 | ||
| val data = sparkContext.parallelize(1 to n, 2).toDS() | ||
| checkAnswer( | ||
| data.sample(withReplacement = false, 0.05, seed = 13), | ||
| Seq(3, 17, 27, 58, 62): _*) | ||
| } | ||
|
|
||
| test("SPARK-11436: we should rebind right encoder when join 2 datasets") { | ||
| val ds1 = Seq("1", "2").toDS().as("a") | ||
| val ds2 = Seq(2, 3).toDS().as("b") | ||
|
|
@@ -440,6 +461,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { | |
|
|
||
|
|
||
| case class ClassData(a: String, b: Int) | ||
| case class ClassNullableData(a: String, b: Integer) | ||
|
|
||
| /** | ||
| * A class used to test serialization using encoders. This class throws exceptions when using | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the extra blank line here