Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
be safe
  • Loading branch information
Davies Liu committed Mar 31, 2016
commit dd71ba90cd41722c8fd3811d2c698da344aeb690
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction])
private[spark] object PythonRunner {
def apply(func: PythonFunction, bufferSize: Int, reuse_worker: Boolean): PythonRunner = {
new PythonRunner(
Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Seq(Seq(0)))
Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Array(Array(0)))
}
}

Expand All @@ -101,7 +101,7 @@ private[spark] class PythonRunner(
bufferSize: Int,
reuse_worker: Boolean,
isUDF: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, do you mind adding scaldoc for these two new parameters?

argOffsets: Seq[Seq[Int]])
argOffsets: Array[Array[Int]])
extends Logging {

require(funcs.length == argOffsets.length, "numArgs should have the same length as funcs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numArgs -> argOffsets

Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, write_int, read_long, \
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, AutoBatchedSerializer
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, BatchedSerializer
from pyspark import shuffle

pickleSer = PickleSerializer()
Expand Down Expand Up @@ -101,7 +101,7 @@ def read_udfs(pickleSer, infile):
mapper = eval(mapper_str, udfs)

func = lambda _, it: map(mapper, it)
ser = AutoBatchedSerializer(PickleSerializer())
ser = BatchedSerializer(PickleSerializer(), 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What serializer did we use before? 100 seems arbitrary here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this patch, we use AutoBatchedSerialzier, which could hold thousands of rows (holding more rows in JVM, may cause OOM).
The 100 is used on Java side.

# profiling is not supported for UDF
return func, None, ser, ser

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ case class BatchPythonEvaluation(udfs: Seq[PythonUDF], output: Seq[Attribute], c
dataTypes += e.dataType
allInputs.length - 1
}
}
}
}.toArray
}.toArray
val projection = newMutableProjection(allInputs, child.output)()

// Input iterator to Python: input rows are grouped so we send them in batches to Python.
// For each row, add it to the queue.
val inputIterator = iter.grouped(1024).map { inputRows =>
val inputIterator = iter.grouped(100).map { inputRows =>
val toBePickled = inputRows.map { inputRow =>
queue.add(inputRow)
val row = projection(inputRow)
Expand Down