Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2e0b308
initial commit of cogroup
d80tb7 Jun 20, 2019
64ff5ac
minor tidy up
d80tb7 Jun 20, 2019
6d039e3
removed incorrect test
d80tb7 Jun 21, 2019
d8a5c5d
tidies up test, fixed output cols
d80tb7 Jun 25, 2019
73188f6
removed incorrect file
d80tb7 Jun 25, 2019
690fa14
Revert: removed incorrect test
d80tb7 Jun 25, 2019
c86b2bf
Merge branch 'master' of https://github.com/d80tb7/spark into SPARK-2…
d80tb7 Jun 25, 2019
e3b66ac
fix for resolving key cols
d80tb7 Jun 25, 2019
8007fa6
common trait for grouped mandas udfs
d80tb7 Jun 27, 2019
d4cf6d0
poc using arrow streams
d80tb7 Jun 27, 2019
87aeb92
more unit tests fro cogroup
d80tb7 Jun 27, 2019
e7528d0
argspec includes grouping key
d80tb7 Jul 2, 2019
b85ec75
fixed tests und
d80tb7 Jul 2, 2019
6a8ecff
keys now handled properly. Validation of udf. More tests
d80tb7 Jul 2, 2019
d2da787
formatting
d80tb7 Jul 2, 2019
7321141
fixed scalastyle errors
d80tb7 Jul 2, 2019
6bbe31c
updated grouped map to new args format
d80tb7 Jul 2, 2019
b444ff7
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
d80tb7 Jul 2, 2019
94be574
some code review fixes
d80tb7 Jul 11, 2019
9241639
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
d80tb7 Jul 11, 2019
3de551f
more code review fixes
d80tb7 Jul 11, 2019
300b53a
more code review fixes
d80tb7 Jul 11, 2019
7d161ba
fix comment on PandasCogroupSerializer
d80tb7 Jul 11, 2019
d1a6366
formatting
d80tb7 Jul 11, 2019
a201161
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
d80tb7 Jul 19, 2019
3e4bc95
python style fixes
d80tb7 Jul 19, 2019
307e664
added doc
d80tb7 Jul 19, 2019
7558b8d
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
d80tb7 Jul 23, 2019
19360c4
minor formatting
d80tb7 Jul 23, 2019
28493b4
a couple more usnit tests
d80tb7 Jul 23, 2019
d6d11e4
minor formatting
d80tb7 Jul 23, 2019
a62a1e3
more doc
d80tb7 Jul 25, 2019
ec78284
added comment to cogroup func
d80tb7 Jul 25, 2019
1a9ff58
fixed python style
d80tb7 Jul 25, 2019
c0d2919
review comments
d80tb7 Aug 20, 2019
4cd5c70
review comments scala
d80tb7 Aug 20, 2019
e025375
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
d80tb7 Aug 20, 2019
dd1ffaf
python formatting
d80tb7 Aug 20, 2019
733b592
review comments (mainly formatting)
d80tb7 Sep 8, 2019
51dcbdc
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
d80tb7 Sep 8, 2019
1b966fd
couple more format changes
d80tb7 Sep 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review comments (mainly formatting)
  • Loading branch information
d80tb7 committed Sep 8, 2019
commit 733b59277b51d36e4640d3ea0ff3e097aa301294
7 changes: 2 additions & 5 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,9 @@ def __repr__(self):

class CogroupUDFSerializer(ArrowStreamPandasUDFSerializer):

def __init__(self, timezone, safecheck, assign_cols_by_name):
super(CogroupUDFSerializer, self).__init__(timezone, safecheck, assign_cols_by_name)

def load_stream(self, stream):
"""
Deserialize Cogrouped ArrowRecordBatches to a tuple of Arrow tables and return as a two
Deserialize Cogrouped ArrowRecordBatches to a tuple of Arrow tables and yield as two
lists of pandas.Series.
"""
import pyarrow as pa
Expand All @@ -427,7 +424,7 @@ def load_stream(self, stream):

elif dataframes_in_group != 0:
raise ValueError(
'Invalid number of dataframes in group {0}'.format(dataframes_in_group))
'Invalid number of pandas.DataFrames in group {0}'.format(dataframes_in_group))


class BatchedSerializer(Serializer):
Expand Down
13 changes: 6 additions & 7 deletions python/pyspark/sql/cogroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@

class CoGroupedData(object):
"""
A logical grouping of two :class:`GroupedData`,
created by :func:`GroupedData.cogroup`.
A logical grouping of two :class:`GroupedData`,
created by :func:`GroupedData.cogroup`.

.. note:: Experimental
.. note:: Experimental

.. versionadded:: 3.0

"""
.. versionadded:: 3.0
"""

def __init__(self, gd1, gd2):
self._gd1 = gd1
Expand All @@ -53,7 +52,7 @@ def apply(self, udf):

.. note:: This function requires a full shuffle. All the data of a cogroup will be loaded
into memory, so the user should be aware of the potential OOM risk if data is skewed
and certain goroups are too large to fit in memory.
and certain groups are too large to fit in memory.

.. note:: Experimental

Expand Down
7 changes: 1 addition & 6 deletions python/pyspark/sql/tests/test_pandas_udf_cogrouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
# limitations under the License.
#

import datetime
import unittest
import sys

from collections import OrderedDict
from decimal import Decimal

from pyspark.sql import Row
from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.types import DoubleType, StructType, StructField
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
from pyspark.testing.utils import QuietTest
Expand Down
52 changes: 27 additions & 25 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,11 @@ def read_udfs(pickleSer, infile, eval_type):
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true")\
.lower() == "true"

# Scalar Pandas UDF handles struct type arguments as pandas DataFrames instead of
# pandas Series. See SPARK-27240.
if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
ser = CogroupUDFSerializer(timezone, safecheck, assign_cols_by_name)
else:
# Scalar Pandas UDF handles struct type arguments as pandas DataFrames instead of
# pandas Series. See SPARK-27240.
df_for_struct = (eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF or
eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF or
eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF)
Expand Down Expand Up @@ -377,28 +377,30 @@ def map_batch(batch):
# profiling is not supported for UDF
return func, None, ser, ser

# Helper function to extract the key and value indexs from arg_offsets
# arg_offsets is a List containing the key and value
# indexes of columns of the DataFrames to be passed to the udf.
# It consists of n repeating groups where n is the number of
# DataFrames. Each group has the following format.
# group[0]: length of group
# group[1]: length of key indexes
# group[2.. group[1] +2]: key attributes
# group[group[1] +3 group[0]]: value attributes
# See BasePandasGroupExec.resolveArgOffsets for equivalent scala code
def extract_key_value_indexes():
def extract_key_value_indexes(grouped_arg_offsets):
"""
Helper function to extract the key and value indexes from arg_offsets for the grouped and
cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for equivalent scala code.

:param grouped_arg_offsets: List containing the key and value indexes of columns of the
DataFrames to be passed to the udf. It consists of n repeating groups where n is the
number of DataFrames. Each group has the following format:
group[0]: length of group
group[1]: length of key indexes
group[2.. group[1] +2]: key attributes
group[group[1] +3 group[0]]: value attributes
"""
parsed = []
i = 0
while i < len(arg_offsets):
offsets_len = arg_offsets[i]
i += 1
offsets = arg_offsets[i: i + offsets_len]
idx = 0
while idx < len(grouped_arg_offsets):
offsets_len = grouped_arg_offsets[idx]
idx += 1
offsets = grouped_arg_offsets[idx: idx + offsets_len]
split_index = offsets[0] + 1
keys = offsets[1: split_index]
values = offsets[split_index:]
parsed.append([keys, values])
i += offsets_len
offset_keys = offsets[1: split_index]
offset_values = offsets[split_index:]
parsed.append([offset_keys, offset_values])
idx += offsets_len
return parsed

udfs = {}
Expand All @@ -417,7 +419,7 @@ def extract_key_value_indexes():
arg_offsets, udf = read_single_udf(
pickleSer, infile, eval_type, runner_conf, udf_index=0)
udfs['f'] = udf
parsed_offsets = extract_key_value_indexes()
parsed_offsets = extract_key_value_indexes(arg_offsets)
keys = ["a[%d]" % (o,) for o in parsed_offsets[0][0]]
vals = ["a[%d]" % (o, ) for o in parsed_offsets[0][1]]
mapper_str = "lambda a: f([%s], [%s])" % (", ".join(keys), ", ".join(vals))
Expand All @@ -428,9 +430,9 @@ def extract_key_value_indexes():
arg_offsets, udf = read_single_udf(
pickleSer, infile, eval_type, runner_conf, udf_index=0)
udfs['f'] = udf
parsed_offsets = extract_key_value_indexes()
parsed_offsets = extract_key_value_indexes(arg_offsets)
df1_keys = ["a[0][%d]" % (o, ) for o in parsed_offsets[0][0]]
df1_vals = ["a[0][%d]" % (o, )for o in parsed_offsets[0][1]]
df1_vals = ["a[0][%d]" % (o, ) for o in parsed_offsets[0][1]]
df2_keys = ["a[1][%d]" % (o, ) for o in parsed_offsets[1][0]]
df2_vals = ["a[1][%d]" % (o, ) for o in parsed_offsets[1][1]]
mapper_str = "lambda a: f([%s], [%s], [%s], [%s])" % (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class MapInPandas(
}

/**
* Flatmap cogroups using a udf: pandas.Dataframe, pandas.Dataframe -> pandas.Dataframe
* Flatmap cogroups using a udf: pandas.Dataframe, pandas.Dataframe -> pandas.Dataframe
* This is used by DataFrame.groupby().cogroup().apply().
*/
case class FlatMapCoGroupsInPandas(
Expand All @@ -66,7 +66,6 @@ case class FlatMapCoGroupsInPandas(
override val producedAttributes = AttributeSet(output)
}


trait BaseEvalPython extends UnaryNode {

def udfs: Seq[PythonUDF]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ package org.apache.spark.sql.execution.python

import java.io._
import java.net._
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._

import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
import org.apache.arrow.vector.ipc.ArrowStreamWriter

import org.apache.spark._
import org.apache.spark.api.python._
Expand All @@ -33,7 +30,6 @@ import org.apache.spark.sql.execution.arrow.ArrowWriter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
import org.apache.spark.util.Utils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ abstract class BaseArrowPythonRunner[T](
argOffsets: Array[Array[Int]])
extends BasePythonRunner[T, ColumnarBatch](funcs, evalType, argOffsets) {


protected override def newReaderIterator(
stream: DataInputStream,
writerThread: WriterThread,
Expand Down Expand Up @@ -111,5 +110,3 @@ abstract class BaseArrowPythonRunner[T](
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
/**
* Base functionality for plans which execute grouped python udfs.
*/
abstract class BasePandasGroupExec(func: Expression,
output: Seq[Attribute]) extends SparkPlan {
abstract class BasePandasGroupExec(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I think I am pretty against this refactoring. There are multiple duplicated codes in R vectorization too (which I added). I didn't intentionally yet refactor those. Plus, I don't think it's good idea to have both refactoring and feature implementation in one PR.

func: Expression,
output: Seq[Attribute])
extends SparkPlan {

protected val sessionLocalTimeZone = conf.sessionLocalTimeZone

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ class CogroupedArrowPythonRunner(
dataOut.writeInt(0)
}

def writeGroup(group: Iterator[InternalRow], schema: StructType, dataOut: DataOutputStream,
name: String) = {
def writeGroup(
group: Iterator[InternalRow],
schema: StructType,
dataOut: DataOutputStream,
name: String) = {
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"stdout writer for $pythonExec ($name)", 0, Long.MaxValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class FlatMapCoGroupsInPandasExec(
output: Seq[Attribute],
left: SparkPlan,
right: SparkPlan)
extends BasePandasGroupExec(func, output) with BinaryExecNode{
extends BasePandasGroupExec(func, output) with BinaryExecNode {

override def outputPartitioning: Partitioning = left.outputPartitioning

Expand All @@ -72,23 +72,26 @@ case class FlatMapCoGroupsInPandasExec(
val (leftDedup, leftArgOffsets) = resolveArgOffsets(left, leftGroup)
val (rightDedup, rightArgOffsets) = resolveArgOffsets(right, rightGroup)

// Map cogrouped rows to ArrowPythonRunner results, Only execute if partition is not empty
left.execute().zipPartitions(right.execute()) { (leftData, rightData) =>
if (leftData.isEmpty && rightData.isEmpty) Iterator.empty else {

val leftGrouped = groupAndProject(leftData, leftGroup, left.output, leftDedup)
val rightGrouped = groupAndProject(rightData, rightGroup, right.output, rightDedup)
val data = new CoGroupedIterator(leftGrouped, rightGrouped, leftGroup)
.map{case (_, l, r) => (l, r)}
val leftGrouped = groupAndProject(leftData, leftGroup, left.output, leftDedup)
val rightGrouped = groupAndProject(rightData, rightGroup, right.output, rightDedup)
val data = new CoGroupedIterator(leftGrouped, rightGrouped, leftGroup)
.map { case (_, l, r) => (l, r) }

val runner = new CogroupedArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
Array(leftArgOffsets ++ rightArgOffsets),
StructType.fromAttributes(leftDedup),
StructType.fromAttributes(rightDedup),
sessionLocalTimeZone,
pythonRunnerConf)
val runner = new CogroupedArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
Array(leftArgOffsets ++ rightArgOffsets),
StructType.fromAttributes(leftDedup),
StructType.fromAttributes(rightDedup),
sessionLocalTimeZone,
pythonRunnerConf)

executePython(data, runner)
executePython(data, runner)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

package org.apache.spark.sql.execution.python

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.TaskContext
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}


/**
* Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
Expand All @@ -53,7 +48,7 @@ case class FlatMapGroupsInPandasExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan)
extends BasePandasGroupExec(func, output) with UnaryExecNode {
extends BasePandasGroupExec(func, output) with UnaryExecNode {

override def outputPartitioning: Partitioning = child.outputPartitioning

Expand Down