Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
512958b
Basework
HeartSaVioR Jul 12, 2022
d36373b
Add Python implementation
HyukjinKwon Jul 14, 2022
f754fd9
Reorder key attributes from deduplicated data attributes
HyukjinKwon Jul 27, 2022
5194e0c
Apply suggestions from code review
HyukjinKwon Jul 27, 2022
1301ee5
Refactoring a bit to respect the column order
HyukjinKwon Aug 11, 2022
135a826
WIP Changes to execute in pipelined manner
HeartSaVioR Aug 15, 2022
9282e5c
WIP further optimization
HeartSaVioR Aug 18, 2022
a792c98
WIP comments for more tunes
HeartSaVioR Aug 18, 2022
27e7af9
WIP further tune...
HeartSaVioR Aug 18, 2022
04a6b98
WIP done more tune! didn't do any of pandas/arrow side tunes
HeartSaVioR Aug 18, 2022
765f4d3
WIP avoid adding additional empty row for state, empty row will be ad…
HeartSaVioR Aug 19, 2022
9e11225
WIP remove debug log
HeartSaVioR Aug 19, 2022
f33d978
WIP hack around to see the possibility of perf gain on binpacking
HeartSaVioR Aug 27, 2022
8604fdf
WIP proper work to apply binpacking on python worker -> executor
HeartSaVioR Aug 27, 2022
0d024e0
WIP fix silly bug
HeartSaVioR Aug 27, 2022
43c623b
WIP another silly bugfix on migration
HeartSaVioR Aug 27, 2022
af1725a
WIP apply binpacking for executor -> python worker as well
HeartSaVioR Aug 27, 2022
31e9687
WIP fix silly bug
HeartSaVioR Aug 27, 2022
cad77a2
WIP fix another silly bug
HeartSaVioR Aug 27, 2022
c3da996
WIP batching per specified size, with sampling
HeartSaVioR Aug 29, 2022
cfb2780
WIP introduce DBR-only change
HeartSaVioR Aug 29, 2022
228b140
WIP debugging now...
HeartSaVioR Aug 29, 2022
ee4ed57
WIP still debugging... weirdness happened
HeartSaVioR Aug 30, 2022
4045ab3
WIP small fix
HeartSaVioR Aug 30, 2022
2d115ab
WIP fix a serious bug... make sure all columns in Arrow RecordBatch h…
HeartSaVioR Aug 30, 2022
3e7d785
WIP strengthen test
HeartSaVioR Aug 30, 2022
029dae7
WIP documenting the changes for pipelining and bin-packing... not yet…
HeartSaVioR Sep 2, 2022
d7ecaf9
WIP sync
HeartSaVioR Sep 2, 2022
6a6dd20
WIP start with is_last_chunk since it's easier to implement... severa…
HeartSaVioR Sep 2, 2022
5cfd59c
WIP adjust the test code to make test pass with multiple calls
HeartSaVioR Sep 2, 2022
63f8f87
WIP refactor a bit... just extract the abstract classes to explicit ones
HeartSaVioR Sep 5, 2022
6e772cd
WIP iterator of DatFrame done! updated tests and they all passed
HeartSaVioR Sep 5, 2022
00836b5
WIP FIX pyspark side test failure
HeartSaVioR Sep 6, 2022
5fdde94
WIP sort out codebase a bit
HeartSaVioR Sep 14, 2022
e7ad043
WIP no batch query support in applyInPandasWithState
HeartSaVioR Sep 6, 2022
5070b81
WIP address some missed things
HeartSaVioR Sep 6, 2022
1b919b8
WIP remove comments which are obsolete or won't be addressed
HeartSaVioR Sep 7, 2022
198fc17
WIP change the return type of user function to Iterator[DataFrame]
HeartSaVioR Sep 7, 2022
f2a75f1
WIP remove unnecessary interface/implementation changes on GroupState…
HeartSaVioR Sep 13, 2022
3e5f5d4
WIP refine out some code
HeartSaVioR Sep 13, 2022
4e34d29
WIP fix scalastyle
HeartSaVioR Sep 13, 2022
50e743e
WIP remove obsolete class
HeartSaVioR Sep 13, 2022
d22d7db
WIP remove the temp fix
HeartSaVioR Sep 13, 2022
e60408f
remove unused code
HeartSaVioR Sep 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactoring a bit to respect the column order
  • Loading branch information
HyukjinKwon authored and HeartSaVioR committed Sep 14, 2022
commit 1301ee5bcce93bfb3075e4ba69ded530ace2f4c6
28 changes: 8 additions & 20 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,38 +523,26 @@ def extract_key_value_indexes(grouped_arg_offsets):
idx += offsets_len
return parsed

if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
# We assume there is only one UDF here because grouped map doesn't
# support combining multiple UDFs.
assert num_udfs == 1

# See FlatMapGroupsInPandasExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
parsed_offsets = extract_key_value_indexes(arg_offsets)

# Create function like this:
# mapper a: f([a[0]], [a[0], a[1]])
def mapper(a):
keys = [a[o] for o in parsed_offsets[0][0]]
vals = [a[o] for o in parsed_offsets[0][1]]
return f(keys, vals)

elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
if eval_type in (
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
):
# We assume there is only one UDF here because grouped map doesn't
# support combining multiple UDFs.
assert num_udfs == 1

# See PythonFlatMapGroupsWithStateExec for how arg_offsets are used to
# See FlatMapGroupsInPandas(WithState)Exec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
arg_offsets, f = read_single_udf(
pickleSer, infile, eval_type, runner_conf, udf_index=0, state=state
)
parsed_offsets = extract_key_value_indexes(arg_offsets)

# Create function like this:
# mapper a: f([a[0]], [a[0], a[1]])
def mapper(a):
keys = [a[o] for o in parsed_offsets[0][0]]
vals = a # it's always all series.
vals = [a[o] for o in parsed_offsets[0][1]]
return f(keys, vals)

elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ case class FlatMapCoGroupsInPandasExec(

override protected def doExecute(): RDD[InternalRow] = {

val (leftDedup, leftArgOffsets) = resolveArgOffsets(left, leftGroup)
val (rightDedup, rightArgOffsets) = resolveArgOffsets(right, rightGroup)
val (leftDedup, leftArgOffsets) = resolveArgOffsets(left.output, leftGroup)
val (rightDedup, rightArgOffsets) = resolveArgOffsets(right.output, rightGroup)

// Map cogrouped rows to ArrowPythonRunner results, Only execute if partition is not empty
left.execute().zipPartitions(right.execute()) { (leftData, rightData) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ case class FlatMapGroupsInPandasExec(
override protected def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute()

val (dedupAttributes, argOffsets) = resolveArgOffsets(child, groupingAttributes)
val (dedupAttributes, argOffsets) = resolveArgOffsets(child.output, groupingAttributes)

// Map grouped rows to ArrowPythonRunner results, Only execute if partition is not empty
inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ case class FlatMapGroupsInPandasWithStateExec(
private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
private val pythonFunction = functionExpr.asInstanceOf[PythonUDF].func
private val chainedFunc = Seq(ChainedPythonFunctions(Seq(pythonFunction)))
private lazy val (dedupAttributes, argOffsets) = resolveArgOffsets(child, groupingAttributes)
private lazy val unsafeProj = UnsafeProjection.create(
dedupAttributes, groupingAttributes ++ dedupAttributes)
private lazy val (dedupAttributes, argOffsets) = resolveArgOffsets(
groupingAttributes ++ child.output, groupingAttributes)
private lazy val unsafeProj = UnsafeProjection.create(dedupAttributes, child.output)

override def requiredChildDistribution: Seq[Distribution] =
StatefulOperatorPartitioning.getCompatibleDistribution(
Expand Down Expand Up @@ -129,7 +129,7 @@ case class FlatMapGroupsInPandasWithStateExec(
new GenericInternalRow(Array.fill(dedupAttributes.length)(null: Any))))
Iterator.single(Iterator.single(joinedKeyRow))
} else {
Iterator.single(valueRowIter)
Iterator.single(valueRowIter.map(unsafeProj))
}

val ret = executePython(inputIter, output, runner).toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.api.python.BasePythonRunner
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan}
import org.apache.spark.sql.execution.GroupedIterator
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}

/**
Expand Down Expand Up @@ -88,9 +88,10 @@ private[python] object PandasGroupUtils {
* argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data attributes
*/
def resolveArgOffsets(
child: SparkPlan, groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {
attributes: Seq[Attribute],
groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {

val dataAttributes = child.output.drop(groupingAttributes.length)
val dataAttributes = attributes.drop(groupingAttributes.length)
val groupingIndicesInData = groupingAttributes.map { attribute =>
dataAttributes.indexWhere(attribute.semanticEquals)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-39962: Global aggregation of Pandas UDF should respect the column order") {
assume(shouldTestGroupedAggPandasUDFs)
assume(shouldTestPythonUDFs)
val df = Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("a", "b")

val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf")
Expand Down