Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
0092abb
Some minor cleanup after SPARK-4550.
sryza May 6, 2015
1fd31ba
[SPARK-6231][SQL/DF] Automatically resolve join condition ambiguity f…
rxin May 6, 2015
51b3d41
Revert "[SPARK-3454] separate json endpoints for data in the UI"
rxin May 6, 2015
a466944
[SPARK-6841] [SPARKR] add support for mean, median, stdev etc.
hqzizania May 6, 2015
ba2b566
[SPARK-7358][SQL] Move DataFrame mathfunctions into functions
brkyvz May 6, 2015
7b14578
[SPARK-6267] [MLLIB] Python API for IsotonicRegression
yanboliang May 6, 2015
9f019c7
[SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in Bro…
zsxwing May 6, 2015
32cdc81
[SPARK-6940] [MLLIB] Add CrossValidator to Python ML pipeline API
mengxr May 6, 2015
322e7e7
[SQL] JavaDoc update for various DataFrame functions.
rxin May 6, 2015
150f671
[SPARK-5456] [SQL] fix decimal compare for jdbc rdd
adrian-wang May 6, 2015
c3eb441
[SPARK-6201] [SQL] promote string and do widen types for IN
adrian-wang May 6, 2015
f2c4708
[SPARK-1442] [SQL] Window Function Support for Spark SQL
yhuai May 6, 2015
002c123
[SPARK-7311] Introduce internal Serializer API for determining if ser…
JoshRosen May 6, 2015
845d1d4
Add `Private` annotation.
JoshRosen May 6, 2015
7740996
[HOT-FIX] Move HiveWindowFunctionQuerySuite.scala to hive compatibili…
yhuai May 6, 2015
1ad04da
[SPARK-5995] [ML] Make Prediction dev API public
jkbradley May 6, 2015
fbf1f34
[HOT FIX] [SPARK-7418] Ignore flaky SparkSubmitUtilsSuite test
May 7, 2015
4e93042
[SPARK-6799] [SPARKR] Remove SparkR RDD examples, add dataframe examples
shivaram May 7, 2015
316a5c0
[SPARK-7396] [STREAMING] [EXAMPLE] Update KafkaWordCountProducer to u…
jerryshao May 7, 2015
8fa6829
[SPARK-7371] [SPARK-7377] [SPARK-7408] DAG visualization addendum (#5…
May 7, 2015
71a452b
[HOT FIX] For DAG visualization #5954
May 7, 2015
14502d5
[SPARK-7405] [STREAMING] Fix the bug that ReceiverInputDStream doesn'…
zsxwing May 7, 2015
773aa25
[SPARK-7432] [MLLIB] disable cv doctest
mengxr May 7, 2015
9cfa9a5
[SPARK-6812] [SPARKR] filter() on DataFrame does not work as expected.
May 7, 2015
2d6612c
[SPARK-5938] [SPARK-5443] [SQL] Improve JsonRDD performance
May 7, 2015
cfdadcb
[SPARK-7430] [STREAMING] [TEST] General improvements to streaming tes…
tdas May 7, 2015
01187f5
[SPARK-7217] [STREAMING] Add configuration to control the default beh…
tdas May 7, 2015
fa8fddf
[SPARK-7295][SQL] bitwise operations for DataFrame DSL
Shiti May 7, 2015
fae4e2d
[SPARK-7035] Encourage __getitem__ over __getattr__ on column access …
ksonj May 7, 2015
8b6b46e
[SPARK-7421] [MLLIB] OnlineLDA cleanups
jkbradley May 7, 2015
4f87e95
[SPARK-7429] [ML] Params cleanups
jkbradley May 7, 2015
ed9be06
[SPARK-7330] [SQL] avoid NPE at jdbc rdd
adrian-wang May 7, 2015
9e2ffb1
[SPARK-7388] [SPARK-7383] wrapper for VectorAssembler in Python
brkyvz May 7, 2015
068c315
[SPARK-7118] [Python] Add the coalesce Spark SQL function available i…
May 7, 2015
1712a7c
[SPARK-6093] [MLLIB] Add RegressionMetrics in PySpark/MLlib
yanboliang May 7, 2015
5784c8d
[SPARK-1442] [SQL] [FOLLOW-UP] Address minor comments in Window Funct…
yhuai May 7, 2015
dec8f53
[SPARK-7116] [SQL] [PYSPARK] Remove cache() causing memory leak
ksonj May 7, 2015
074d75d
[SPARK-5213] [SQL] Remove the duplicated SparkSQLParser
chenghao-intel May 7, 2015
0c33bf8
[SPARK-7399] [SPARK CORE] Fixed compilation error in scala 2.11
May 7, 2015
4eecf55
[SPARK-7373] [MESOS] Add docker support for launching drivers in meso…
tnachen May 7, 2015
f121651
[SPARK-7391] DAG visualization: auto expand if linked from another viz
May 7, 2015
88717ee
[SPARK-7347] DAG visualization: add tooltips to RDDs
May 7, 2015
347a329
[SPARK-7328] [MLLIB] [PYSPARK] Pyspark.mllib.linalg.Vectors: Missing …
MechCoder May 7, 2015
658a478
[SPARK-5726] [MLLIB] Elementwise (Hadamard) Vector Product Transformer
ogeagla May 7, 2015
e43803b
[SPARK-6948] [MLLIB] compress vectors in VectorAssembler
mengxr May 7, 2015
97d1182
[SQL] [MINOR] make star and multialias extend NamedExpression
scwf May 7, 2015
ea3077f
[SPARK-7277] [SQL] Throw exception if the property mapred.reduce.task…
viirya May 7, 2015
937ba79
[SPARK-5281] [SQL] Registering table on RDD is giving MissingRequirem…
dragos May 7, 2015
35f0173
[SPARK-2155] [SQL] [WHEN D THEN E] [ELSE F] add CaseKeyWhen for "CASE…
cloud-fan May 7, 2015
88063c6
[SPARK-7450] Use UNSAFE.getLong() to speed up BitSetMethods#anySet()
tedyu May 7, 2015
22ab70e
[SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly informa…
zsxwing May 8, 2015
cd1d411
[SPARK-6908] [SQL] Use isolated Hive client
marmbrus May 8, 2015
92f8f80
[SPARK-7452] [MLLIB] fix bug in topBykey and update test
coderxiang May 8, 2015
3af423c
[SPARK-6986] [SQL] Use Serializer2 in more cases.
yhuai May 8, 2015
714db2e
[SPARK-7470] [SQL] Spark shell SQLContext crashes without hive
May 8, 2015
f496bf3
[SPARK-7232] [SQL] Add a Substitution batch for spark sql analyzer
scwf May 8, 2015
c2f0821
[SPARK-7392] [CORE] bugfix: Kryo buffer size cannot be larger than 2M
liyezhang556520 May 8, 2015
ebff732
[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH
lianhuiwang May 8, 2015
c796be7
[SPARK-3454] separate json endpoints for data in the UI
squito May 8, 2015
f5ff4a8
[SPARK-7383] [ML] Feature Parity in PySpark for ml.features
brkyvz May 8, 2015
65afd3c
[SPARK-7474] [MLLIB] update ParamGridBuilder doctest
mengxr May 8, 2015
008a60d
[SPARK-6824] Fill the docs for DataFrame API in SparkR
hqzizania May 8, 2015
35d6a99
[SPARK-7436] Fixed instantiation of custom recovery mode factory and …
jacek-lewandowski May 8, 2015
a1ec08f
[SPARK-7298] Harmonize style of new visualizations
mateiz May 8, 2015
2d05f32
[SPARK-7133] [SQL] Implement struct, array, and map field accessor
cloud-fan May 8, 2015
4b3bb0e
[SPARK-6627] Finished rename to ShuffleBlockResolver
kayousterhout May 8, 2015
25889d8
[SPARK-7490] [CORE] [Minor] MapOutputTracker.deserializeMapStatuses: …
May 8, 2015
dc71e47
[MINOR] Ignore python/lib/pyspark.zip
zsxwing May 8, 2015
c45c09b
[WEBUI] Remove debug feature for vis.js
sarutak May 8, 2015
4e7360e
[SPARK-7489] [SPARK SHELL] Spark shell crashes when compiled with sca…
vinodkc May 8, 2015
31da40d
[MINOR] Defeat early garbage collection of test suite variable
tellison May 8, 2015
3b0c5e7
[SPARK-7466] DAG visualization: fix orphan nodes
May 8, 2015
9042f8f
[MINOR] [CORE] Allow History Server to read kerberos opts from config…
May 8, 2015
5467c34
[SPARK-7378] [CORE] Handle deep links to unloaded apps.
May 8, 2015
90527f5
[SPARK-7390] [SQL] Only merge other CovarianceCounter when its count …
viirya May 8, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-7133] [SQL] Implement struct, array, and map field accessor
It's the first step: generalize UnresolvedGetField to support all map, struct, and array
TODO: add `apply` in Scala and `__getitem__` in Python, and unify the `getItem` and `getField` methods to one single API(or should we keep them for compatibility?).

Author: Wenchen Fan <[email protected]>

Closes apache#5744 from cloud-fan/generalize and squashes the following commits:

715c589 [Wenchen Fan] address comments
7ea5b31 [Wenchen Fan] fix python test
4f0833a [Wenchen Fan] add python test
f515d69 [Wenchen Fan] add apply method and test cases
8df6199 [Wenchen Fan] fix python test
239730c [Wenchen Fan] fix test compile
2a70526 [Wenchen Fan] use _bin_op in dataframe.py
6bf72bc [Wenchen Fan] address comments
3f880c3 [Wenchen Fan] add java doc
ab35ab5 [Wenchen Fan] fix python test
b5961a9 [Wenchen Fan] fix style
c9d85f5 [Wenchen Fan] generalize UnresolvedGetField to support all map, struct, and array
  • Loading branch information
cloud-fan authored and marmbrus committed May 8, 2015
commit 2d05f325dc3c70349bd17ed399897f22d967c687
24 changes: 12 additions & 12 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ def __init__(self, jc):

# container operators
__contains__ = _bin_op("contains")
__getitem__ = _bin_op("getItem")
__getitem__ = _bin_op("apply")

# bitwise operators
bitwiseOR = _bin_op("bitwiseOR")
Expand Down Expand Up @@ -1308,19 +1308,19 @@ def getField(self, name):
>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
>>> df.select(df.r.getField("b")).show()
+---+
|r.b|
+---+
| b|
+---+
+----+
|r[b]|
+----+
| b|
+----+
>>> df.select(df.r.a).show()
+---+
|r.a|
+---+
| 1|
+---+
+----+
|r[a]|
+----+
| 1|
+----+
"""
return Column(self._jc.getField(name))
return self[name]

def __getattr__(self, item):
if item.startswith("__"):
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,13 @@ def test_access_nested_types(self):
self.assertEqual("v", df.select(df.d["k"]).first()[0])
self.assertEqual("v", df.select(df.d.getItem("k")).first()[0])

def test_field_accessor(self):
df = self.sc.parallelize([Row(l=[1], r=Row(a=1, b="b"), d={"k": "v"})]).toDF()
self.assertEqual(1, df.select(df.l[0]).first()[0])
self.assertEqual(1, df.select(df.r["a"]).first()[0])
self.assertEqual("b", df.select(df.r["b"]).first()[0])
self.assertEqual("v", df.select(df.d["k"]).first()[0])

def test_infer_long_type(self):
longrow = [Row(f1='a', f2=100000000000000)]
df = self.sc.parallelize(longrow).toDF()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected lazy val primary: PackratParser[Expression] =
( literal
| expression ~ ("[" ~> expression <~ "]") ^^
{ case base ~ ordinal => GetItem(base, ordinal) }
{ case base ~ ordinal => UnresolvedExtractValue(base, ordinal) }
| (expression <~ ".") ~ ident ^^
{ case base ~ fieldName => UnresolvedGetField(base, fieldName) }
{ case base ~ fieldName => UnresolvedExtractValue(base, Literal(fieldName)) }
| cast
| "(" ~> expression <~ ")"
| function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ class Analyzer(
withPosition(u) { q.resolveChildren(nameParts, resolver).getOrElse(u) }
logDebug(s"Resolving $u to $result")
result
case UnresolvedGetField(child, fieldName) if child.resolved =>
GetField(child, fieldName, resolver)
case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
ExtractValue(child, fieldExpr, resolver)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,17 @@ case class ResolvedStar(expressions: Seq[NamedExpression]) extends Star {
override def toString: String = expressions.mkString("ResolvedStar(", ", ", ")")
}

case class UnresolvedGetField(child: Expression, fieldName: String) extends UnaryExpression {
/**
* Extracts a value or values from an Expression
*
* @param child The expression to extract value from,
* can be Map, Array, Struct or array of Structs.
* @param extraction The expression to describe the extraction,
* can be key of Map, index of Array, field name of Struct.
*/
case class UnresolvedExtractValue(child: Expression, extraction: Expression)
extends UnaryExpression {

override def dataType: DataType = throw new UnresolvedException(this, "dataType")
override def foldable: Boolean = throw new UnresolvedException(this, "foldable")
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
Expand All @@ -193,5 +203,5 @@ case class UnresolvedGetField(child: Expression, fieldName: String) extends Unar
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString: String = s"$child.$fieldName"
override def toString: String = s"$child[$extraction]"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}

import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedExtractValue, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
Expand Down Expand Up @@ -100,8 +100,9 @@ package object dsl {
def isNull: Predicate = IsNull(expr)
def isNotNull: Predicate = IsNotNull(expr)

def getItem(ordinal: Expression): Expression = GetItem(expr, ordinal)
def getField(fieldName: String): UnresolvedGetField = UnresolvedGetField(expr, fieldName)
def getItem(ordinal: Expression): UnresolvedExtractValue = UnresolvedExtractValue(expr, ordinal)
def getField(fieldName: String): UnresolvedExtractValue =
UnresolvedExtractValue(expr, Literal(fieldName))

def cast(to: DataType): Expression = Cast(expr, to)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import scala.collection.Map

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.types._

object ExtractValue {
/**
* Returns the resolved `ExtractValue`. It will return one kind of concrete `ExtractValue`,
* depend on the type of `child` and `extraction`.
*
* `child` | `extraction` | concrete `ExtractValue`
* ----------------------------------------------------------------
* Struct | Literal String | GetStructField
* Array[Struct] | Literal String | GetArrayStructFields
* Array | Integral type | GetArrayItem
* Map | Any type | GetMapValue
*/
def apply(
child: Expression,
extraction: Expression,
resolver: Resolver): ExtractValue = {

(child.dataType, extraction) match {
case (StructType(fields), Literal(fieldName, StringType)) =>
val ordinal = findField(fields, fieldName.toString, resolver)
GetStructField(child, fields(ordinal), ordinal)
case (ArrayType(StructType(fields), containsNull), Literal(fieldName, StringType)) =>
val ordinal = findField(fields, fieldName.toString, resolver)
GetArrayStructFields(child, fields(ordinal), ordinal, containsNull)
case (_: ArrayType, _) if extraction.dataType.isInstanceOf[IntegralType] =>
GetArrayItem(child, extraction)
case (_: MapType, _) =>
GetMapValue(child, extraction)
case (otherType, _) =>
val errorMsg = otherType match {
case StructType(_) | ArrayType(StructType(_), _) =>
s"Field name should be String Literal, but it's $extraction"
case _: ArrayType =>
s"Array index should be integral type, but it's ${extraction.dataType}"
case other =>
s"Can't extract value from $child"
}
throw new AnalysisException(errorMsg)
}
}

def unapply(g: ExtractValue): Option[(Expression, Expression)] = {
g match {
case o: ExtractValueWithOrdinal => Some((o.child, o.ordinal))
case _ => Some((g.child, null))
}
}

/**
* Find the ordinal of StructField, report error if no desired field or over one
* desired fields are found.
*/
private def findField(fields: Array[StructField], fieldName: String, resolver: Resolver): Int = {
val checkField = (f: StructField) => resolver(f.name, fieldName)
val ordinal = fields.indexWhere(checkField)
if (ordinal == -1) {
throw new AnalysisException(
s"No such struct field $fieldName in ${fields.map(_.name).mkString(", ")}")
} else if (fields.indexWhere(checkField, ordinal + 1) != -1) {
throw new AnalysisException(
s"Ambiguous reference to fields ${fields.filter(checkField).mkString(", ")}")
} else {
ordinal
}
}
}

trait ExtractValue extends UnaryExpression {
self: Product =>

type EvaluatedType = Any
}

/**
* Returns the value of fields in the Struct `child`.
*/
case class GetStructField(child: Expression, field: StructField, ordinal: Int)
extends ExtractValue {

override def dataType: DataType = field.dataType
override def nullable: Boolean = child.nullable || field.nullable
override def foldable: Boolean = child.foldable
override def toString: String = s"$child.${field.name}"

override def eval(input: Row): Any = {
val baseValue = child.eval(input).asInstanceOf[Row]
if (baseValue == null) null else baseValue(ordinal)
}
}

/**
* Returns the array of value of fields in the Array of Struct `child`.
*/
case class GetArrayStructFields(
child: Expression,
field: StructField,
ordinal: Int,
containsNull: Boolean) extends ExtractValue {

override def dataType: DataType = ArrayType(field.dataType, containsNull)
override def nullable: Boolean = child.nullable
override def foldable: Boolean = child.foldable
override def toString: String = s"$child.${field.name}"

override def eval(input: Row): Any = {
val baseValue = child.eval(input).asInstanceOf[Seq[Row]]
if (baseValue == null) null else {
baseValue.map { row =>
if (row == null) null else row(ordinal)
}
}
}
}

abstract class ExtractValueWithOrdinal extends ExtractValue {
self: Product =>

def ordinal: Expression

/** `Null` is returned for invalid ordinals. */
override def nullable: Boolean = true
override def foldable: Boolean = child.foldable && ordinal.foldable
override def toString: String = s"$child[$ordinal]"
override def children: Seq[Expression] = child :: ordinal :: Nil

override def eval(input: Row): Any = {
val value = child.eval(input)
if (value == null) {
null
} else {
val o = ordinal.eval(input)
if (o == null) {
null
} else {
evalNotNull(value, o)
}
}
}

protected def evalNotNull(value: Any, ordinal: Any): Any
}

/**
* Returns the field at `ordinal` in the Array `child`
*/
case class GetArrayItem(child: Expression, ordinal: Expression)
extends ExtractValueWithOrdinal {

override def dataType: DataType = child.dataType.asInstanceOf[ArrayType].elementType

override lazy val resolved = childrenResolved &&
child.dataType.isInstanceOf[ArrayType] && ordinal.dataType.isInstanceOf[IntegralType]

protected def evalNotNull(value: Any, ordinal: Any) = {
// TODO: consider using Array[_] for ArrayType child to avoid
// boxing of primitives
val baseValue = value.asInstanceOf[Seq[_]]
val index = ordinal.asInstanceOf[Int]
if (index >= baseValue.size || index < 0) {
null
} else {
baseValue(index)
}
}
}

/**
* Returns the value of key `ordinal` in Map `child`
*/
case class GetMapValue(child: Expression, ordinal: Expression)
extends ExtractValueWithOrdinal {

override def dataType: DataType = child.dataType.asInstanceOf[MapType].valueType

override lazy val resolved = childrenResolved && child.dataType.isInstanceOf[MapType]

protected def evalNotNull(value: Any, ordinal: Any) = {
val baseValue = value.asInstanceOf[Map[Any, _]]
baseValue.get(ordinal).orNull
}
}
Loading