Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
ENH add introspection API for concurrent.futures Executor
  • Loading branch information
tomMoral committed Jan 12, 2018
commit 602d0722e2cfcab7dfe8b3259a4f0309067a3628
67 changes: 67 additions & 0 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,73 @@ Executor Objects
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

The following :class:`Executor` methods are meant for use to introspect the
state of the :class:`Executor` and the tasks processed by it.

.. method:: worker_count()

Return the actual number of workers in the executor.

.. versionchanged:: 3.7
Added the *worker_count* method.

.. method:: active_worker_count()

Return the number of workers currently running a task in the executor.

.. versionchanged:: 3.7
Added the *active_worker_count* method.

.. method:: idle_worker_count()

Return the number of workers currently waiting for a new task to be
submitted to the executor.

.. versionchanged:: 3.7
Added the *idle_worker_count* method.

.. method:: task_count()

Return the number of task pending for the executor.

.. versionchanged:: 3.7
Added the *task_count* method.

.. method:: active_task_count()

Return the number of task which are currently being processed by the
executor.

.. versionchanged:: 3.7
Added the *active_task_count* method.

.. method:: waiting_task_count()

Return the number of task waiting to be processed by the executor.

.. versionchanged:: 3.7
Added the *waiting_task_count* method.

.. method:: active_tasks()

Return a dictionary with WorkItems representing the tasks which are
currently being processed by the executor. The WorkItem object is a
container holding the function *fn*, its arguments *args* and *kwargs*
and the associated :class:`Future` in *future*.

.. versionchanged:: 3.7
Added the *active_tasks* method.

.. method:: waiting_tasks()

Return a dictionary with WorkItems representing the tasks waiting to be
processed by the executor. The WorkItem object is a container holding
the function *fn*, its arguments *args* and *kwargs* and the associated
:class:`Future` in *future*.

.. versionchanged:: 3.7
Added the *waiting_tasks* method.


ThreadPoolExecutor
------------------
Expand Down
40 changes: 40 additions & 0 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,22 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, set(fs) - done)


class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs

def __str__(self):
return repr(self)

def __repr__(self):
return "<WorkItem: {} args: {} kwargs: {}>".format(self.fn, self.args,
self.kwargs)


class Future(object):
"""Represents the result of an asynchronous computation."""

Expand Down Expand Up @@ -604,6 +620,30 @@ def shutdown(self, wait=True):
"""
pass

def worker_count(self):
raise NotImplementedError()

def active_worker_count(self):
raise NotImplementedError()

def idle_worker_count(self):
raise NotImplementedError()

def task_count(self):
raise NotImplementedError()

def active_task_count(self):
raise NotImplementedError()

def waiting_task_count(self):
raise NotImplementedError()

def active_tasks(self):
raise NotImplementedError()

def waiting_tasks(self):
raise NotImplementedError()

def __enter__(self):
return self

Expand Down
52 changes: 45 additions & 7 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ def _rebuild_exc(exc, tb):
exc.__cause__ = _RemoteTraceback(tb)
return exc

class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs

class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
Expand Down Expand Up @@ -168,6 +162,11 @@ def _on_queue_feeder_error(self, e, obj):
super()._on_queue_feeder_error(e, obj)


class _WorkId(object):
def __init__(self, work_id):
self.work_id = work_id


def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
Expand Down Expand Up @@ -226,6 +225,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
# Wake up queue management thread
result_queue.put(os.getpid())
return

# Notify the executor that the job work_id is being processed
result_queue.put(_WorkId(call_item.work_id))
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
Expand Down Expand Up @@ -280,6 +282,7 @@ def _add_call_item_to_queue(pending_work_items,
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
active_work_items,
work_ids_queue,
call_queue,
result_queue,
Expand All @@ -296,6 +299,8 @@ def _queue_management_worker(executor_reference,
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
active_work_items: A dict mapping work ids to _WorkItems being run e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A ctx.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
Expand Down Expand Up @@ -400,6 +405,8 @@ def shutdown_worker():
if not processes:
shutdown_worker()
return
elif isinstance(result_item, _WorkId):
active_work_items.add(result_item.work_id)
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
Expand All @@ -410,6 +417,8 @@ def shutdown_worker():
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item

active_work_items.remove(result_item.work_id)
# Delete reference to result_item
del result_item

Expand Down Expand Up @@ -525,6 +534,7 @@ def __init__(self, max_workers=None, mp_context=None,
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
self._active_work_items = set()

# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
Expand Down Expand Up @@ -566,6 +576,7 @@ def weakref_cb(_,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._active_work_items,
self._work_ids,
self._call_queue,
self._result_queue,
Expand Down Expand Up @@ -595,7 +606,7 @@ def submit(self, fn, *args, **kwargs):
raise RuntimeError('cannot schedule new futures after shutdown')

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
w = _base._WorkItem(f, fn, args, kwargs)

self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
Expand Down Expand Up @@ -656,4 +667,31 @@ def shutdown(self, wait=True):
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__

def worker_count(self):
return len(self._processes)

def active_worker_count(self):
return self.active_task_count()

def idle_worker_count(self):
return self.worker_count() - self.active_worker_count()

def task_count(self):
return len(self._pending_work_items)

def active_task_count(self):
return len(self._active_work_items)

def waiting_task_count(self):
return self.task_count() - self.active_task_count()

def active_tasks(self):
return {self._pending_work_items[t] for t in self._active_work_items}

def waiting_tasks(self):
tasks = [v for k, v in self._pending_work_items.items()
if k not in self._active_work_items]
return tasks


atexit.register(_python_exit)
Loading