Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
7bf45dd
Adds zip function to sparksql
DylanGuedes Apr 11, 2018
99848fe
Changes zip construction
DylanGuedes Apr 13, 2018
27b0bc2
Changes tests and uses builtin namespace in pyspark
DylanGuedes Apr 13, 2018
93826b6
fixes examples string and uses struct instead of arrays
DylanGuedes Apr 26, 2018
a7e29f6
working pyspark zip_lists
DylanGuedes May 11, 2018
7130fec
Fixes java version when arrays have different lengths
DylanGuedes May 11, 2018
d552216
remove unused variables
DylanGuedes May 11, 2018
1fecef4
rename zip_lists to zip
DylanGuedes May 11, 2018
f71151a
adds expression tests and uses strip margin syntax
DylanGuedes May 12, 2018
6b4bc94
Adds variable number of inputs to zip function
DylanGuedes May 15, 2018
1549928
uses foldleft instead of while for iterating
DylanGuedes May 15, 2018
9f7bba1
rewritten some notation
DylanGuedes May 16, 2018
3ba2b4f
fix dogencode generation
DylanGuedes May 17, 2018
3a59201
Adds new tests, uses lazy val and split calls
DylanGuedes May 17, 2018
6462fa8
uses splitFunction
DylanGuedes May 17, 2018
8b1eb7c
move arraytypes to private member
DylanGuedes May 18, 2018
2bfba80
adds binary and array of array tests
DylanGuedes May 18, 2018
c3b062c
uses stored array types names
DylanGuedes May 18, 2018
d9b95c4
split input function using ctxsplitexpression
DylanGuedes May 18, 2018
26bbf66
uses splitexpression for inputs
DylanGuedes May 19, 2018
d9ad04d
Refactor cases, add new tests with empty seq, check size of array
DylanGuedes May 22, 2018
f29ee1c
Check empty seq as input
DylanGuedes May 22, 2018
c58d09c
Uses switch instead of if
DylanGuedes May 23, 2018
38fa996
refactor switch and else methods
DylanGuedes May 23, 2018
5b3066b
uses if instead of switch
DylanGuedes May 30, 2018
759a4d4
Not using storedarrtype anymore
DylanGuedes Jun 4, 2018
68e69db
split between empty and nonempty codegen
DylanGuedes Jun 4, 2018
12b3835
remove ternary if
DylanGuedes Jun 4, 2018
643cb9b
Fixes null values evaluation and adds back tests
DylanGuedes Jun 4, 2018
5876082
move to else
DylanGuedes Jun 4, 2018
0223960
remove unused lines
DylanGuedes Jun 4, 2018
2b88387
use zip alias
DylanGuedes Jun 5, 2018
bbc20ee
using same docs for all apis
DylanGuedes Jun 8, 2018
8d3a838
adds transient to method
DylanGuedes Jun 8, 2018
d8f3dea
rename zip function to arrays_zip
DylanGuedes Jun 10, 2018
3d68ea9
adds pretty_name for arrays_zip
DylanGuedes Jun 11, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use zip alias
Signed-off-by: DylanGuedes <[email protected]>
  • Loading branch information
DylanGuedes committed Jun 5, 2018
commit 2b883879b8efd0d514553612cb2918617bb5044b
10 changes: 5 additions & 5 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def corr(col1, col2):

>>> a = range(20)
>>> b = [2 * x for x in range(20)]
>>> df = spark.createDataFrame(__builtin__.zip(a, b), ["a", "b"])
>>> df = spark.createDataFrame(zip(a, b), ["a", "b"])
>>> df.agg(corr("a", "b").alias('c')).collect()
[Row(c=1.0)]
"""
Expand All @@ -364,7 +364,7 @@ def covar_pop(col1, col2):

>>> a = [1] * 10
>>> b = [1] * 10
>>> df = spark.createDataFrame(__builtin__.zip(a, b), ["a", "b"])
>>> df = spark.createDataFrame(zip(a, b), ["a", "b"])
>>> df.agg(covar_pop("a", "b").alias('c')).collect()
[Row(c=0.0)]
"""
Expand Down Expand Up @@ -2402,10 +2402,10 @@ def zip(*cols):

:param cols: columns in input
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: columns of arrays to be merged.


>>> from pyspark.sql.functions import zip
>>> from pyspark.sql.functions import zip as spark_zip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. What do you think @ueshin ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was thinking about the naming issue actually. Should we use arrays_zip, zip_arrays or something instead of zip? Or any suggestions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should stick with something related to zip (such as "zip_arrays" or "zip_lists") for "compatibility naming" with other APIs/languages (Enum.zip in Elixir and zip in Scala, for instance).

Copy link
Member

@ueshin ueshin Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think zip_arrays arrays_zip should be fine.
Do you have any ideas? @BryanCutler @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, I think zip is fine (like sum, min or max). I agree it's a bad practice to shadow builtin functions but we already started this in this way ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrays_zip is fine to me too, just for clarification if you guys feel hesitant with going ahead as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use arrays_zip.
As for min, max, we use array_min and array_max for array type functions apart from aggregate functions. I think arrays_zip is consistent with the names.

@DylanGuedes Would you mind if I ask to change to arrays_zip? Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! I thought we are talking about shadowing builtin function. Sure, that's better.

>>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
>>> df.select(zip(df.vals1, df.vals2).alias('zipped')).collect()
[Row(zipped=[1, 2]), Row(zipped=[2, 3]), Row(zipped=[3, 4])]
>>> df.select(spark_zip(df.vals1, df.vals2).alias('zipped')).collect()
[Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.zip(_to_seq(sc, cols, _to_java_column)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,30 +156,29 @@ case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTy
case ((expr: NamedExpression, elementType), _) =>
StructField(expr.name, elementType, nullable = true)
case ((_, elementType), idx) =>
StructField(s"$idx", elementType, nullable = true)
StructField(idx.toString, elementType, nullable = true)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make List explicitly? How about:

val fields = arrayTypes.zipWithIndex.map { case (arr, idx) =>
  StructField( ... )
}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

val fields = children.zip(arrayElementTypes).zipWithIndex.map {
  case ((expr: NamedExpression, elementType), _) =>
    StructField(expr.name, elementType, nullable = true)
  case ((_, elementType), idx) =>
    StructField(s"$idx", elementType, nullable = true)
}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Way better, thanks!

StructType(fields)
}

val numberOfArrays: Int = children.length
@transient lazy val numberOfArrays: Int = children.length

def emptyInputGenCode(ev: ExprCode): ExprCode = {
val genericArrayData = classOf[GenericArrayData].getName
@transient lazy val genericArrayData = classOf[GenericArrayData].getName

def emptyInputGenCode(ev: ExprCode): ExprCode = {
ev.copy(code"""
|${CodeGenerator.javaType(dataType)} ${ev.value} = new $genericArrayData(new Object[0]);
|boolean ${ev.isNull} = false;
""".stripMargin)
}

def nonEmptyInputGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val genericArrayData = classOf[GenericArrayData].getName
val genericInternalRow = classOf[GenericInternalRow].getName
val arrVals = ctx.freshName("arrVals")
val arrCardinality = ctx.freshName("arrCardinality")
val biggestCardinality = ctx.freshName("biggestCardinality")

val myobject = ctx.freshName("myobject")
val currentRow = ctx.freshName("currentRow")
val j = ctx.freshName("j")
val i = ctx.freshName("i")
val args = ctx.freshName("args")
Expand Down Expand Up @@ -218,11 +217,11 @@ case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTy
val getValueForType = arrayElementTypes.zipWithIndex.map { case (eleType, idx) =>
val g = CodeGenerator.getValue(s"$arrVals[$idx]", eleType, i)
s"""
|if ($i < $arrCardinality[$idx] && !$arrVals[$idx].isNullAt($i)) {
| $myobject[$idx] = $g;
|} else {
| $myobject[$idx] = null;
|}
|if ($i < $arrCardinality[$idx] && !$arrVals[$idx].isNullAt($i)) {
Copy link
Member

@viirya viirya Jun 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like arrCardinality is only used here. We can write $arrVals[$idx].numElements() and remove arrCardinality. It simplifies the arguments of two splitExpressions.

| $currentRow[$idx] = $g;
|} else {
| $currentRow[$idx] = null;
|}
""".stripMargin
}

Expand All @@ -231,7 +230,7 @@ case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTy
funcName = "extractValue",
arguments =
("int", i) ::
("Object[]", myobject) ::
("Object[]", currentRow) ::
("int[]", arrCardinality) ::
("ArrayData[]", arrVals) :: Nil)

Expand All @@ -249,9 +248,9 @@ case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTy
|if (!${ev.isNull}) {
| Object[] $args = new Object[$biggestCardinality];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!${ev.isNull}) {
   Object[] $args = ...

We usually don't set a value if the result is null.

| for (int $i = 0; $i < $biggestCardinality; $i ++) {
| Object[] $myobject = new Object[$numberOfArrays];
| Object[] $currentRow = new Object[$numberOfArrays];
| $getValueForTypeSplitted
| $args[$i] = new $genericInternalRow($myobject);
| $args[$i] = new $genericInternalRow($currentRow);
| }
| ${ev.value} = new $genericArrayData($args);
|}
Expand Down