Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feedback
  • Loading branch information
squito committed Aug 28, 2018
commit d07d21d83516492155207c06dfbde3a171412a68
8 changes: 4 additions & 4 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,20 @@ def _do_server_auth(conn, auth_secret):
raise Exception("Unexpected reply from iterator server.")


def local_connect_and_auth(sock_info):
def local_connect_and_auth(port, auth_secret):
"""
Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection.
Handles IPV4 & IPV6, does some error handling.
:param sock_info: a tuple of (port, auth_secret) for connecting
:param port
:param auth_secret
:return: a tuple with (sockfile, sock)
"""
port, auth_secret = sock_info
sock = None
errors = []
# Support for both IPv4 and IPv6.
# On most of IPv6-ready systems, IPv6 will take precedence.
for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
af, socktype, proto, _, sa = res
sock = socket.socket(af, socktype, proto)
try:
sock.settimeout(15)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _parse_memory(s):


def _load_from_socket(sock_info, serializer):
(sockfile, sock) = local_connect_and_auth(sock_info)
(sockfile, sock) = local_connect_and_auth(*sock_info)
# The RDD materialization time is unpredicable, if we set a timeout for socket reading
# operation, it will very possibly fail. See SPARK-18281.
sock.settimeout(None)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/taskcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def _load_from_socket(port, auth_secret):
Load data from a given socket, this is a blocking method thus only return when the socket
connection has been closed.
"""
(sockfile, sock) = local_connect_and_auth((port, auth_secret))
(sockfile, sock) = local_connect_and_auth(port, auth_secret)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must set sock timeout to None to allow barrier() call blocking forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, thanks! updated

# Make a barrier() function call.
write_int(BARRIER_FUNCTION, sockfile)
sockfile.flush()
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,5 +364,5 @@ def process():
# Read information about how to connect back to the JVM from the environment.
java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
(sock_file, _) = local_connect_and_auth((java_port, auth_secret))
(sock_file, _) = local_connect_and_auth(java_port, auth_secret)
main(sock_file, sock_file)