Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move code block from load_from_path to value
  • Loading branch information
schintap committed Nov 28, 2018
commit d9994b7f9b6aaaf9f87ff09b1d45f3c204f7b4d3
22 changes: 11 additions & 11 deletions python/pyspark/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,8 @@ def dump(self, value, f):
f.close()

def load_from_path(self, path):
# we only need to decrypt it here if its on the driver since executor
# decryption handled already
if self._sc is not None and self._sc._encryption_enabled:
port, auth_secret = self._python_broadcast.setupDecryptionServer()
(decrypted_sock_file, _) = local_connect_and_auth(port, auth_secret)
self._python_broadcast.waitTillBroadcastDataSent()
return self.load(decrypted_sock_file)
else:
with open(path, 'rb', 1 << 20) as f:
return self.load(f)
with open(path, 'rb', 1 << 20) as f:
return self.load(f)

def load(self, file):
# "file" could also be a socket
Expand All @@ -145,7 +137,15 @@ def value(self):
""" Return the broadcasted value
"""
if not hasattr(self, "_value") and self._path is not None:
self._value = self.load_from_path(self._path)
# we only need to decrypt it here when encryption is enabled and
# if its on the driver, since executor decryption is handled already
if self._sc._encryption_enabled:
port, auth_secret = self._python_broadcast.setupDecryptionServer()
(decrypted_sock_file, _) = local_connect_and_auth(port, auth_secret)
self._python_broadcast.waitTillBroadcastDataSent()
return self.load(decrypted_sock_file)
else:
self._value = self.load_from_path(self._path)
return self._value

def unpersist(self, blocking=False):
Expand Down