Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
da98359
Adding a server into socketserver that allows client connections to b…
deliberist Jan 21, 2018
15ec16b
bpo-32608: updated socketserver.rst based on pull-request comments su…
deliberist Jan 23, 2018
5444d6b
bpo-32608: Adding documentation into Doc/whatsnew and Misc/NEWS.d/nex…
deliberist Feb 10, 2018
f7de7b9
bpo-32608: Fixed test_socketserver where it modified the environment …
deliberist Feb 10, 2018
535f13d
Merge remote-tracking branch 'upstream/master' into socketserver_and_…
deliberist May 3, 2018
f84538d
Merge remote-tracking branch 'upstream/master' into socketserver_and_…
deliberist May 5, 2018
5eee0ea
Merge remote-tracking branch 'upstream/master' into socketserver_and_…
deliberist May 5, 2018
cd209db
bpo-32608: Per PR-5258 comments: moved documentation to Python 3.8 fi…
deliberist May 5, 2018
28f397d
bpo-32608: Per PR-5258 comments: made the ProcessingMixIn ensure Proc…
deliberist May 5, 2018
22bdab7
bpo-32608: Per PR-5258 comments, removed "multiprocessing.process._da…
deliberist May 5, 2018
9adc823
bpo-32608: fixing errors with test_socketserver in a Windows environm…
deliberist May 6, 2018
1f32fd7
bpo-32608: Fixing broken test_socketserver tests when run in Linux.
deliberist May 7, 2018
ef2fd3b
bpo-32608: Fixing broken test_socketserver tests when run in Windows.
deliberist May 7, 2018
7097ae6
bpo-32608: Merge remote-tracking branch 'upstream/master' into socket…
deliberist Jun 7, 2018
61ef9f8
Merge pull request #1 from python/master
Mar 16, 2021
77d2cb2
Merge branch 'socketserver_and_multiprocessing' into master
Mar 17, 2021
18d14fd
Merge pull request #2 from rbprogrammer/master
Mar 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions Doc/library/socketserver.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ completed before the next request can be started. This isn't suitable if each
request takes a long time to complete, because it requires a lot of computation,
or because it returns a lot of data which the client is slow to process. The
solution is to create a separate process or thread to handle each request; the
:class:`ForkingMixIn` and :class:`ThreadingMixIn` mix-in classes can be used to
support asynchronous behaviour.
:class:`ForkingMixIn`, :class:`ProcessingMixIn`, and :class:`ThreadingMixIn`
mix-in classes can be used to support asynchronous behaviour.

Creating a server requires several steps. First, you must create a request
handler class by subclassing the :class:`BaseRequestHandler` class and
Expand Down Expand Up @@ -99,11 +99,14 @@ server classes.


.. class:: ForkingMixIn
ProcessingMixIn
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add a versionadded marker for the new class.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

ThreadingMixIn

Forking and threading versions of each type of server can be created
using these mix-in classes. For instance, :class:`ThreadingUDPServer`
is created as follows::
.. versionadded:: 3.8

Forking, multiprocessing, and threading versions of each type of server can
be created using these mix-in classes. For instance,
:class:`ThreadingUDPServer` is created as follows::

class ThreadingUDPServer(ThreadingMixIn, UDPServer):
pass
Expand All @@ -112,6 +115,17 @@ server classes.
:class:`UDPServer`. Setting the various attributes also changes the
behavior of the underlying server mechanism.

The :class:`ForkingMixIn` and :class:`ProcessingMixIn` classes are very
similar in terms of diverting the request connection to a child process.
:class:`ForkingMixIn` does this by spawning a child process via the low-level
:func:`~os.fork` method, while :class:`ProcessingMixIn` does this via the
:class:`multiprocessing.Process` object.

Determining which mix-in class to use depends on the specific scenario for
the server. The :class:`ForkingMixIn` is best suited for light-weight
applications, while the :class:`ProcessingMixIn` is best suited for
applications that make heavy use of the :mod:`multiprocessing` module.

:class:`ForkingMixIn` and the Forking classes mentioned below are
only available on POSIX platforms that support :func:`~os.fork`.

Expand All @@ -137,9 +151,13 @@ server classes.

.. class:: ForkingTCPServer
ForkingUDPServer
ProcessingTCPServer
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here (need a versionadded marker).

ProcessingUDPServer
ThreadingTCPServer
ThreadingUDPServer

.. versionadded:: 3.8

These classes are pre-defined using the mix-in classes.


Expand All @@ -162,7 +180,7 @@ On the other hand, if you are building an HTTP server where all data is stored
externally (for instance, in the file system), a synchronous class will
essentially render the service "deaf" while one request is being handled --
which may be for a very long time if a client is slow to receive all the data it
has requested. Here a threading or forking server is appropriate.
has requested. Here a threading, processing, or forking server is appropriate.

In some cases, it may be appropriate to process part of a request synchronously,
but to finish processing in a forked child depending on the request data. This
Expand Down Expand Up @@ -595,8 +613,8 @@ The output of the example should look exactly like for the TCP server example.
Asynchronous Mixins
~~~~~~~~~~~~~~~~~~~

To build asynchronous handlers, use the :class:`ThreadingMixIn` and
:class:`ForkingMixIn` classes.
To build asynchronous handlers, use the :class:`ThreadingMixIn`,
:class:`ForkingMixIn`, and :class:`ProcessingMixIn` classes.

An example for the :class:`ThreadingMixIn` class::

Expand Down Expand Up @@ -656,7 +674,11 @@ The output of the example should look something like this:
Received: Thread-4: Hello World 3


The :class:`ForkingMixIn` class is used in the same way, except that the server
will spawn a new process for each request.
Available only on POSIX platforms that support :func:`~os.fork`.
On POSIX platforms that support :func:`~os.fork`, both the :class:`ForkingMixIn`
and :class:`ProcessingMixIn` classes can be used in the same way, except that the
server will spawn a new process for each request as opposed to a new thread.

On systems that do not support :func:`~os.fork`, the :class:`ProcessingMixIn` is
still available since it relies on :class:`multiprocessing.Process` to spawn new
processes. The :class:`ForkingMixIn` class is not available on systems that do
not support :func:`~os.fork`.
8 changes: 7 additions & 1 deletion Doc/whatsnew/3.8.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1977,6 +1977,13 @@ Changes in the Python API

* :class:`asyncio.BufferedProtocol` has graduated to the stable API.

* New :mod:`socketserver` classes were added called
:class:`ProcessingUDPServer`, :class:`ProcessingTCPServer`, and
:class:`ProcessingMixIn` to handle client connections in sub-processes
spawned from :mod:`multiprocessing`. This also enabled multiple process
client connection handling on non-POSIX systems that do not support
:func:`os.fork`. (Contributed by rbprogrammer in :issue:`32608`.)

.. _bpo-36085-whatsnew:

* DLL dependencies for extension modules and DLLs loaded with :mod:`ctypes` on
Expand Down Expand Up @@ -2134,7 +2141,6 @@ Changes in the C API

(Contributed by Steve Dower in :issue:`37351`.)


CPython bytecode changes
------------------------

Expand Down
213 changes: 149 additions & 64 deletions Lib/socketserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class will essentially render the service "deaf" while one request is
__version__ = "0.4"


import multiprocessing
import socket
import selectors
import os
Expand All @@ -134,7 +135,9 @@ class will essentially render the service "deaf" while one request is
__all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "ThreadingTCPServer",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
"DatagramRequestHandler", "ThreadingMixIn",
"ProcessingUDPServer", "ProcessingTCPServer",
"ProcessingMixIn", "ChildProcessManagerMixIn"]
if hasattr(os, "fork"):
__all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"])
if hasattr(socket, "AF_UNIX"):
Expand Down Expand Up @@ -201,7 +204,7 @@ def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__is_shut_down = multiprocessing.Event()
self.__shutdown_request = False

def server_activate(self):
Expand Down Expand Up @@ -539,73 +542,108 @@ def close_request(self, request):
# No need to close anything.
pass

if hasattr(os, "fork"):
class ForkingMixIn:
"""Mix-in class to handle each request in a new process."""

timeout = 300
active_children = None
max_children = 40
# If true, server_close() waits until all child processes complete.
block_on_close = True
class ChildProcessManagerMixIn:
"""Mix-in class to handle each request in a new process."""

def collect_children(self, *, blocking=False):
"""Internal routine to wait for children that have exited."""
if self.active_children is None:
return
timeout = 300
active_children = None
max_children = 40
# If true, server_close() waits until all child processes complete.
block_on_close = True

# If we're above the max number of children, wait and reap them until
# we go back below threshold. Note that we use waitpid(-1) below to be
# able to collect children in size(<defunct children>) syscalls instead
# of size(<children>): the downside is that this might reap children
# which we didn't spawn, which is why we only resort to this when we're
# above max_children.
while len(self.active_children) >= self.max_children:
try:
pid, _ = os.waitpid(-1, 0)
self.active_children.discard(pid)
except ChildProcessError:
# we don't have any children, we're done
self.active_children.clear()
except OSError:
break

# Now reap all defunct children.
for pid in self.active_children.copy():
try:
flags = 0 if blocking else os.WNOHANG
pid, _ = os.waitpid(pid, flags)
# if the child hasn't exited yet, pid will be 0 and ignored by
# discard() below
self.active_children.discard(pid)
except ChildProcessError:
# someone else reaped it
self.active_children.discard(pid)
except OSError:
pass

def handle_timeout(self):
"""Wait for zombies after self.timeout seconds of inactivity.

May be extended, do not override.
"""
self.collect_children()

def service_actions(self):
"""Collect the zombie child processes regularly in the ForkingMixIn.

service_actions is called in the BaseServer's serve_forever loop.
"""
self.collect_children()
# Dictionary of PID to optional sub-class specific objects.
active_children = {}

def _wait_on_any_child(self, blocking):
"""
Waits for the child and returns the child's pid.

:returns: Process id that was joined.
"""
raise NotImplementedError('Sub-classes must override this method.')

def collect_children(self, *, blocking=False):
"""Internal routine to wait for children that have exited."""
if not self.active_children:
return

# If we're above the max number of children, wait and reap them until
# we go back below threshold. Note that we use waitpid(-1) below to be
# able to collect children in size(<defunct children>) syscalls instead
# of size(<children>): the downside is that this might reap children
# which we didn't spawn, which is why we only resort to this when we're
# above max_children.
while len(self.active_children) >= self.max_children:
try:
pid = self._wait_on_any_child(blocking=True)
if pid in self.active_children:
del self.active_children[pid]
except ChildProcessError:
# we don't have any children, we're done
self.active_children.clear()
except OSError:
break

# Now reap all defunct children.
while len(self.active_children):
try:
pid = self._wait_on_any_child(blocking=blocking)
if pid in self.active_children:
del self.active_children[pid]
except ChildProcessError:
if pid in self.active_children:
del self.active_children[pid]
except OSError:
pass

def handle_timeout(self):
"""Wait for zombies after self.timeout seconds of inactivity.

May be extended, do not override.
"""
self.collect_children()

def service_actions(self):
"""Collect the zombie child processes regularly in the ForkingMixIn.

service_actions is called in the BaseServer's serve_forver loop.
"""
self.collect_children()

def process_request(self, request, client_address):
"""
Implementors should create a new subprocess to process the request.
"""
pass

def server_close(self):
super().server_close()
self.collect_children(blocking=self.block_on_close)

if hasattr(os, "fork"):
class ForkingMixIn(ChildProcessManagerMixIn):
"""Mix-in class to handle each request in a new process."""

def _wait_on_any_child(self, blocking):
"""Waits on any forked child to complete."""
while True:
active_pids = list(self.active_children.keys())
for active_pid in active_pids:
pid, _ = os.waitpid(active_pid, os.WNOHANG)
if pid:
return pid
# If not blocking, return after the first pass of not finding
# anything ready.
if not blocking:
return None

def process_request(self, request, client_address):
"""Fork a new subprocess to process the request."""
pid = os.fork()
if pid:
# Parent process
if self.active_children is None:
self.active_children = set()
self.active_children.add(pid)
# Parent process; save the child PID.
self.active_children[pid] = None
self.close_request(request)
return
else:
Expand All @@ -623,9 +661,56 @@ def process_request(self, request, client_address):
finally:
os._exit(status)

def server_close(self):
super().server_close()
self.collect_children(blocking=self.block_on_close)

class ProcessingMixIn(ChildProcessManagerMixIn):
"""Mix-in class to handle each request in a new child :class:`Process`."""

join_timeout = 0.1

def _wait_on_any_child(self, blocking):
"""Waits on any :class:`Process` child to complete."""
timeout = None if blocking else self.join_timeout

# Get all of the joinable processes.
from multiprocessing.connection import wait
children = list(self.active_children.values())
sentinels = [p.sentinel for p in children]
joinable_sentinels = wait(sentinels, timeout)
joinable_processes = [p for p in children
if p.sentinel in joinable_sentinels]

# Just need to join() one.
if joinable_processes:
proc = joinable_processes[0]
proc.join()
return proc.pid
else:
return None

def _process_request_in_child(self, request, client_address):
"""Handles the actual request in the new child process."""
status = 1
try:
self.finish_request(request, client_address)
status = 0
except Exception:
self.handle_error(request, client_address)
finally:
try:
self.shutdown_request(request)
finally:
os._exit(status)

def process_request(self, request, client_address):
"""Create a new :class:`Process` to process the request."""
p = multiprocessing.Process(target=self._process_request_in_child,
args=(request, client_address))
p.start()
self.active_children[p.pid] = p
self.close_request(request)

class ProcessingUDPServer(ProcessingMixIn, UDPServer): pass
class ProcessingTCPServer(ProcessingMixIn, TCPServer): pass


class _Threads(list):
Expand Down
Loading