Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
docs
  • Loading branch information
rxin committed Mar 18, 2016
commit 9519678cee18153ff0876ba8c4200e14c660b14a
156 changes: 55 additions & 101 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,36 +61,48 @@ private[sql] object Dataset {
}

/**
* A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
* using functional or relational operations.
* A [[Dataset]] is a strongly typed collection of domain-specific objects that can be transformed
* in parallel using functional or relational operations. Each Dataset also has an untyped view
* called a [[DataFrame]], which is a Dataset of [[Row]].
*
* A [[Dataset]] differs from an [[RDD]] in the following ways:
* Operations available on Datasets are divided into transformations and actions. Transformations
* are the ones that produce new Datasets, and actions are the ones that trigger computation and
* return results. Example transformations include map, filter, select, aggregate (groupBy).
* Example actions count, show, or writing data out to file systems.
*
* - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
* in the encoded form. This representation allows for additional logical operations and
* enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
* an object.
* - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
* used to serialize the object into a binary format. Encoders are also capable of mapping the
* schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime
* reflection based serialization. Operations that change the type of object stored in the
* dataset also need an encoder for the new type.
* Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally,
* a Dataset represents a logical plan that describes the computation required to produce the data.
* When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a
* physical plan for efficient execution in a parallel or distributed manner. To explore the
* logical plan as well as optimized physical plan, use the `explain` function.
*
* Different from DataFrame in Spark 1.6.0 and earlier versions, a [[Dataset]] can be thought of as
* a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a
* generic [[Row]] container. Since Spark 2.0.0, DataFrame is simply a type alias of
* `Dataset[Row]`.
* To efficiently support domain-specific objects, an [[Encoder]] is required. The encoder maps
* the domain specific type T to Spark's internal type system. For example, given a class Person
* with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code
* at runtime to serialize the Person object into a binary structure. This binary structure often
* has much lower memory footprint as well as are optimized for efficiency in data processing
* (e.g. in a columnar format). To understand the internal binary representation for data, use the
* `schema` function.
*
* The following example creates a `Dataset[Row]` by pointing Spark SQL to a Parquet data set.
* There are typically two ways to create a Dataset. The most common way to by pointing Spark
* to some files on storage systems, using the `read` function available on a `SparkSession`.
* {{{
* val people = sqlContext.read.parquet("...") // in Scala
* Dataset<Row> people = sqlContext.read().parquet("...") // in Java
* val people = session.read.parquet("...").as[Person] // Scala
* Dataset<Person> people = session.read().parquet("...").as(Encoders.bean(Person.class) // Java
* }}}
*
* Once created, it can be manipulated using the various domain-specific-language (DSL) functions
* defined in: [[Dataset]] (this class), [[Column]], and [[functions]].
* Datasets can also be created through transformations available on existing Datasets. For example,
* the following creates a new Dataset by applying a filter on the existing one:
* {{{
* val names = people.map(_.name) // in Scala; names is a Dataset[String]
* Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING) // in Java 8
* }}}
*
* To select a column from the data frame, use `apply` method in Scala and `col` in Java.
* Dataset operations can also be untyped, through the various domain-specific-language (DSL)
* functions defined in: [[Dataset]] (this class), [[Column]], and [[functions]]. These operations
* are very similar to the operations available in the data frame abstraction in R or Python.
*
* To select a column from the Dataset, use `apply` method in Scala and `col` in Java.
* {{{
* val ageCol = people("age") // in Scala
* Column ageCol = people.col("age") // in Java
Expand Down Expand Up @@ -241,7 +253,6 @@ class Dataset[T] private[sql](
}

/**
* :: Experimental ::
* Converts this strongly typed collection of data to generic Dataframe. In contrast to the
* strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
* objects that allow fields to be accessed by ordinal or name.
Expand All @@ -251,7 +262,6 @@ class Dataset[T] private[sql](
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
@Experimental
def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, RowEncoder(schema))

/**
Expand Down Expand Up @@ -1094,7 +1104,7 @@ class Dataset[T] private[sql](
def cube(cols: Column*): GroupedData = GroupedData(toDF(), cols.map(_.expr), GroupedData.CubeType)

/**
* Groups the [[Dataset]] 2.0.0
* Groups the [[Dataset]] using the specified columns, so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* This is a variant of groupBy that can only group by existing columns using column names
Expand Down Expand Up @@ -1257,64 +1267,10 @@ class Dataset[T] private[sql](
GroupedData(toDF(), colNames.map(colName => resolve(colName)), GroupedData.CubeType)
}

/**
* (Scala-specific) Aggregates on the entire [[Dataset]] without groups.
* {{{
* // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
* ds.agg("age" -> "max", "salary" -> "avg")
* ds.groupBy().agg("age" -> "max", "salary" -> "avg")
* }}}
*
* @group untypedrel
* @since 2.0.0
*/
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
groupBy().agg(aggExpr, aggExprs : _*)
}

/**
* (Scala-specific) Aggregates on the entire [[Dataset]] without groups.
* {{{
* // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
* ds.agg(Map("age" -> "max", "salary" -> "avg"))
* ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}}
*
* @group untypedrel
* @since 2.0.0
*/
def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)

/**
* (Java-specific) Aggregates on the entire [[Dataset]] without groups.
* {{{
* // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
* ds.agg(Map("age" -> "max", "salary" -> "avg"))
* ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}}
*
* @group untypedrel
* @since 2.0.0
*/
def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs)

/**
* Aggregates on the entire [[Dataset]] without groups.
* {{{
* // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
* ds.agg(max($"age"), avg($"salary"))
* ds.groupBy().agg(max($"age"), avg($"salary"))
* }}}
*
* @group untypedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*)

/**
* Returns a new [[Dataset]] by taking the first `n` rows. The difference between this function
* and `head` is that `head` returns an array while `limit` returns a new [[Dataset]].
* and `head` is that `head` is an action and returns an array (by triggering query execution)
* while `limit` returns a new [[Dataset]].
*
* @group typedrel
* @since 2.0.0
Expand All @@ -1327,6 +1283,9 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
*
* To do a SQL-style set union (that does deduplication of elements), use this function followed
* by a [[distinct]].
*
* @group typedrel
* @since 2.0.0
*/
Expand All @@ -1336,19 +1295,13 @@ class Dataset[T] private[sql](
CombineUnions(Union(logicalPlan, other.logicalPlan))
}

/**
* Returns a new [[Dataset]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
*
* @group typedrel
* @since 2.0.0
*/
def union(other: Dataset[T]): Dataset[T] = unionAll(other)

/**
* Returns a new [[Dataset]] containing rows only in both this frame and another frame.
* This is equivalent to `INTERSECT` in SQL.
*
* Note that, equality checking is performed directly on the encoded representation of the data
* and thus is not affected by a custom `equals` function defined on `T`.
*
* @group typedrel
* @since 1.6.0
*/
Expand All @@ -1360,22 +1313,16 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] containing rows in this frame but not in another frame.
* This is equivalent to `EXCEPT` in SQL.
*
* Note that, equality checking is performed directly on the encoded representation of the data
* and thus is not affected by a custom `equals` function defined on `T`.
*
* @group typedrel
* @since 2.0.0
*/
def except(other: Dataset[T]): Dataset[T] = withTypedPlan {
Except(logicalPlan, other.logicalPlan)
}

/**
* Returns a new [[Dataset]] containing rows in this frame but not in another frame.
* This is equivalent to `EXCEPT` in SQL.
*
* @group typedrel
* @since 2.0.0
*/
def subtract(other: Dataset[T]): Dataset[T] = except(other)

/**
* Returns a new [[Dataset]] by sampling a fraction of rows.
*
Expand Down Expand Up @@ -1448,6 +1395,7 @@ class Dataset[T] private[sql](
}

/**
* :: Experimental ::
* (Scala-specific) Returns a new [[Dataset]] where each row has been expanded to zero or more
* rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. The columns of
* the input row are implicitly joined with each row that is output by the function.
Expand All @@ -1470,6 +1418,7 @@ class Dataset[T] private[sql](
* @group untypedrel
* @since 2.0.0
*/
@Experimental
def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]

Expand All @@ -1489,6 +1438,7 @@ class Dataset[T] private[sql](
}

/**
* :: Experimental ::
* (Scala-specific) Returns a new [[Dataset]] where a single column has been expanded to zero
* or more rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. All
* columns of the input row are implicitly joined with each value that is output by the function.
Expand All @@ -1500,6 +1450,7 @@ class Dataset[T] private[sql](
* @group untypedrel
* @since 2.0.0
*/
@Experimental
def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B])
: DataFrame = {
val dataType = ScalaReflection.schemaFor[B].dataType
Expand Down Expand Up @@ -1770,7 +1721,7 @@ class Dataset[T] private[sql](
/**
* Concise syntax for chaining custom transformations.
* {{{
* def featurize(ds: Dataset[T]) = ...
* def featurize(ds: Dataset[T]): Dataset[U] = ...
*
* ds
* .transform(featurize)
Expand Down Expand Up @@ -2051,6 +2002,9 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] that contains only the unique rows from this [[Dataset]].
* This is an alias for `dropDuplicates`.
*
* Note that, equality checking is performed directly on the encoded representation of the data
* and thus is not affected by a custom `equals` function defined on `T`.
*
* @group typedrel
* @since 2.0.0
*/
Expand Down Expand Up @@ -2136,7 +2090,7 @@ class Dataset[T] private[sql](
* @group rdd
* @since 1.6.0
*/
def javaRDD: JavaRDD[T] = toJavaRDD
def javaRDD: JavaRDD[T] = rdd.toJavaRDD()

/**
* Registers this [[Dataset]] as a temporary table using the given name. The lifetime of this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import scala.concurrent.duration._
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryNode}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.util.ThreadUtils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

/**
* An interface for exchanges.
* Base class for operators that exchange data among multiple threads or processes.
*
* Exchanges are the key class of operators that enable parallelism. Although the implementation
* differs significantly, the concept is similar to the exchange operator described in
* "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe.
*/
abstract class Exchange extends UnaryNode {
override def output: Seq[Attribute] = child.output
Expand Down