Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
60e4e2f
support unpickle array.array for Python 2.6
davies Sep 11, 2014
c77c87b
cleanup debugging code
davies Sep 13, 2014
3908f5c
Merge branch 'master' into pickle
davies Sep 13, 2014
f44f771
enable tests about array
davies Sep 13, 2014
b30ef35
use pickle to serialize data for mllib/recommendation
davies Sep 13, 2014
52d1350
use new protocol in mllib/stat
davies Sep 13, 2014
f1544c4
refactor clustering
davies Sep 13, 2014
aa2287e
random
davies Sep 13, 2014
8fe166a
Merge branch 'pickle' into pickle_mllib
davies Sep 13, 2014
cccb8b1
mllib/tree
davies Sep 14, 2014
d9f691f
mllib/util
davies Sep 14, 2014
f2a0856
mllib/regression
davies Sep 14, 2014
c383544
classification
davies Sep 14, 2014
6d26b03
fix tests
davies Sep 14, 2014
4d7963e
remove muanlly serialization
davies Sep 14, 2014
84c721d
Merge branch 'master' into pickle_mllib
davies Sep 14, 2014
b02e34f
remove _common.py
davies Sep 14, 2014
0ee1525
remove outdated tests
davies Sep 14, 2014
722dd96
cleanup _common.py
davies Sep 15, 2014
f3506c5
Merge branch 'master' into pickle_mllib
davies Sep 16, 2014
df19464
memorize the module and class name during pickleing
davies Sep 16, 2014
46a501e
choose batch size automatically
davies Sep 16, 2014
88034f0
rafactor, address comments
davies Sep 16, 2014
9dcfb63
fix style
davies Sep 16, 2014
708dc02
fix tests
davies Sep 16, 2014
e1d1bfc
refactor
davies Sep 16, 2014
44736d7
speed up pickling array in Python 2.7
davies Sep 16, 2014
154d141
fix autobatchedpickler
davies Sep 16, 2014
df625c7
Merge commit '154d141' into pickle_mllib
davies Sep 16, 2014
a379a81
fix pickle array in python2.7
davies Sep 16, 2014
44e0551
fix cache
davies Sep 16, 2014
9ceff73
test size of serialized Rating
davies Sep 16, 2014
a2cc855
fix tests
davies Sep 16, 2014
1fccf1a
address comments
davies Sep 16, 2014
2511e76
cleanup
davies Sep 16, 2014
19d0967
refactor Picklers
davies Sep 17, 2014
e431377
fix cache of rdd, refactor
davies Sep 17, 2014
bd738ab
address comments
davies Sep 18, 2014
032cd62
add more type check and conversion for user_product
davies Sep 18, 2014
810f97f
fix equal of matrix
davies Sep 18, 2014
dffbba2
Merge branch 'master' of github.com:apache/spark into pickle_mllib
davies Sep 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
choose batch size automatically
  • Loading branch information
davies committed Sep 16, 2014
commit 46a501e5079b5543360783a541b289333c3293ea
33 changes: 27 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -775,17 +775,38 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}

private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private val pickle = new Pickler()
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]

override def hasNext(): Boolean = iter.hasNext

override def next(): Array[Byte] = {
while (iter.hasNext && buffer.length < batch) {
buffer += iter.next()
}
val bytes = pickle.dumps(buffer)
val size = bytes.length
// let 1M < size < 10M
if (size < 1024 * 100) {
batch = (1024 * 100) / size // fast grow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first record is small, e.g., a SparseVector with a single nonzero, and the records followed are large vectors, line 789 may cause memory problems. Does it give significant performance gain? under what circumstances?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Without this fast path, batch may need to grow 15 times to become stable, it's good and safer. I will remove this fast path.

} else if (size < 1024 * 1024) {
batch *= 2
} else if (size > 1024 * 1024 * 10) {
batch /= 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first record is very large, batch will be 0.

}
buffer.clear()
bytes
}
}

/**
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
* PySpark.
*/
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
jRDD.rdd.mapPartitions { iter =>
val pickle = new Pickler
iter.grouped(1024).map { rows =>
pickle.dumps(rows.toArray)
}
}
jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
}

/**
Expand Down
8 changes: 3 additions & 5 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, CompressedSerializer
PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
Expand Down Expand Up @@ -1927,10 +1927,8 @@ def _to_java_object_rdd(self):
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
if not self._is_pickled():
self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer()))
return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, True)

def countApprox(self, timeout, confidence=0.95):
"""
Expand Down
37 changes: 37 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import types
import collections
import zlib
import itertools

from pyspark import cloudpickle

Expand Down Expand Up @@ -211,6 +212,42 @@ def __str__(self):
return "BatchedSerializer<%s>" % str(self.serializer)


class AutoBatchedSerializer(BatchedSerializer):
"""
Choose the size of batch automatically based on the size of object
"""

def __init__(self, serializer, bestSize=1 << 20):
BatchedSerializer.__init__(self, serializer, -1)
self.bestSize = bestSize

def dump_stream(self, iterator, stream):
batch, best = 1, self.bestSize
iterator = iter(iterator)
while True:
vs = list(itertools.islice(iterator, batch))
if not vs:
break

bytes = self.serializer.dumps(vs)
size = len(bytes)
if size < best / 10:
batch = best / 10 / size
elif size < best:
batch *= 2
elif size > best * 10:
batch /= 2
write_int(len(bytes), stream)
stream.write(bytes)

def __eq__(self, other):
return (isinstance(other, AutoBatchedSerializer) and
other.serializer == self.serializer)

def __str__(self):
return "BatchedSerializer<%s>" % str(self.serializer)


class CartesianDeserializer(FramedSerializer):

"""
Expand Down