Skip to content
Closed
Prev Previous commit
Next Next commit
Copy over support work weakset, dynamic classess, and remove __transi…
…ent__ support from PR#110
  • Loading branch information
holdenk committed Jul 24, 2017
commit 1cfd38f73da328bcf58ab32228ace4ff59bc26d2
123 changes: 90 additions & 33 deletions python/pyspark/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,71 @@ def _save_subimports(self, code, top_level_dependencies):
# then discards the reference to it
self.write(pickle.POP)

def save_dynamic_class(self, obj):
"""
Save a class that can't be stored as module global.

This method is used to serialize classes that are defined inside
functions, or that otherwise can't be serialized as attribute lookups
from global modules.
"""
clsdict = dict(obj.__dict__) # copy dict proxy to a dict
if not isinstance(clsdict.get('__dict__', None), property):
# don't extract dict that are properties
clsdict.pop('__dict__', None)
clsdict.pop('__weakref__', None)

# hack as __new__ is stored differently in the __dict__
new_override = clsdict.get('__new__', None)
if new_override:
clsdict['__new__'] = obj.__new__

save = self.save
write = self.write

# We write pickle instructions explicitly here to handle the
# possibility that the type object participates in a cycle with its own
# __dict__. We first write an empty "skeleton" version of the class and
# memoize it before writing the class' __dict__ itself. We then write
# instructions to "rehydrate" the skeleton class by restoring the
# attributes from the __dict__.
#
# A type can appear in a cycle with its __dict__ if an instance of the
# type appears in the type's __dict__ (which happens for the stdlib
# Enum class), or if the type defines methods that close over the name
# of the type, (which is common for Python 2-style super() calls).

# Push the rehydration function.
save(_rehydrate_skeleton_class)

# Mark the start of the args for the rehydration function.
write(pickle.MARK)

# On PyPy, __doc__ is a readonly attribute, so we need to include it in
# the initial skeleton class. This is safe because we know that the
# doc can't participate in a cycle with the original class.
doc_dict = {'__doc__': clsdict.pop('__doc__', None)}

# Create and memoize an empty class with obj's name and bases.
save(type(obj))
save((
obj.__name__,
obj.__bases__,
doc_dict,
))
write(pickle.REDUCE)
self.memoize(obj)

# Now save the rest of obj's __dict__. Any references to obj
# encountered while saving will point to the skeleton class.
save(clsdict)

# Write a tuple of (skeleton_class, clsdict).
write(pickle.TUPLE)

# Call _rehydrate_skeleton_class(skeleton_class, clsdict)
write(pickle.REDUCE)

def save_function_tuple(self, func):
""" Pickles an actual func object.

Expand Down Expand Up @@ -513,6 +578,12 @@ def save_builtin_function(self, obj):
dispatch[types.BuiltinFunctionType] = save_builtin_function

def save_global(self, obj, name=None, pack=struct.pack):
"""
Save a "global".

The name of this method is somewhat misleading: all types get
dispatched here.
"""
if obj.__module__ == "__builtin__" or obj.__module__ == "builtins":
if obj in _BUILTIN_TYPE_NAMES:
return self.save_reduce(_builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj)
Expand All @@ -536,18 +607,7 @@ def save_global(self, obj, name=None, pack=struct.pack):

typ = type(obj)
if typ is not obj and isinstance(obj, (type, types.ClassType)):
d = dict(obj.__dict__) # copy dict proxy to a dict
if not isinstance(d.get('__dict__', None), property):
# don't extract dict that are properties
d.pop('__dict__', None)
d.pop('__weakref__', None)

# hack as __new__ is stored differently in the __dict__
new_override = d.get('__new__', None)
if new_override:
d['__new__'] = obj.__new__

self.save_reduce(typ, (obj.__name__, obj.__bases__, d), obj=obj)
self.save_dynamic_class(obj)
else:
raise pickle.PicklingError("Can't pickle %r" % obj)

Expand All @@ -567,8 +627,7 @@ def save_instancemethod(self, obj):
dispatch[types.MethodType] = save_instancemethod

def save_inst(self, obj):
"""Inner logic to save instance. Based off pickle.save_inst
Supports __transient__"""
"""Inner logic to save instance. Based off pickle.save_inst"""
cls = obj.__class__

# Try the dispatch table (pickle module doesn't do it)
Expand Down Expand Up @@ -606,13 +665,6 @@ def save_inst(self, obj):
getstate = obj.__getstate__
except AttributeError:
stuff = obj.__dict__
#remove items if transient
if hasattr(obj, '__transient__'):
transient = obj.__transient__
stuff = stuff.copy()
for k in list(stuff.keys()):
if k in transient:
del stuff[k]
else:
stuff = getstate()
pickle._keep_alive(stuff, memo)
Expand Down Expand Up @@ -675,8 +727,6 @@ def __getattribute__(self, item):

def save_reduce(self, func, args, state=None,
listitems=None, dictitems=None, obj=None):
"""Modified to support __transient__ on new objects
Change only affects protocol level 2 (which is always used by PiCloud"""
# Assert that args is a tuple or None
if not isinstance(args, tuple):
raise pickle.PicklingError("args from reduce() should be a tuple")
Expand All @@ -690,7 +740,6 @@ def save_reduce(self, func, args, state=None,

# Protocol 2 special case: if func's name is __newobj__, use NEWOBJ
if self.proto >= 2 and getattr(func, "__name__", "") == "__newobj__":
#Added fix to allow transient
cls = args[0]
if not hasattr(cls, "__new__"):
raise pickle.PicklingError(
Expand All @@ -701,15 +750,6 @@ def save_reduce(self, func, args, state=None,
args = args[1:]
save(cls)

#Don't pickle transient entries
if hasattr(obj, '__transient__'):
transient = obj.__transient__
state = state.copy()

for k in list(state.keys()):
if k in transient:
del state[k]

save(args)
write(pickle.NEWOBJ)
else:
Expand Down Expand Up @@ -798,6 +838,13 @@ def save_not_implemented(self, obj):
dispatch[type(Ellipsis)] = save_ellipsis
dispatch[type(NotImplemented)] = save_not_implemented

# WeakSet was added in 2.7.
if hasattr(weakref, 'WeakSet'):
def save_weakset(self, obj):
self.save_reduce(weakref.WeakSet, (list(obj),))

dispatch[weakref.WeakSet] = save_weakset

"""Special functions for Add-on libraries"""
def inject_addons(self):
"""Plug in system. Register additional pickling functions if modules already loaded"""
Expand Down Expand Up @@ -986,6 +1033,16 @@ def _make_skel_func(code, cell_count, base_globals=None):
return types.FunctionType(code, base_globals, None, None, closure)


def _rehydrate_skeleton_class(skeleton_class, class_dict):
"""Put attributes from `class_dict` back on `skeleton_class`.

See CloudPickler.save_dynamic_class for more info.
"""
for attrname, attr in class_dict.items():
setattr(skeleton_class, attrname, attr)
return skeleton_class


def _find_module(mod_name):
"""
Iterate over each part instead of calling imp.find_module directly.
Expand Down