Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions distributed/bokeh/background/server_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def http_get(route):
logger.info("Can not connect to %s", url, exc_info=True)
return
except HTTPError:
logger.warn("http route %s failed", route)
logger.warning("http route %s failed", route)
return
msg = json.loads(response.body.decode())
messages[route]['deque'].append(msg)
Expand All @@ -58,7 +58,7 @@ def workers():
response = yield client.fetch(
'http://%(host)s:%(http-port)d/workers.json' % options)
except HTTPError:
logger.warn("workers http route failed")
logger.warning("workers http route failed")
return
msg = json.loads(response.body.decode())
if msg:
Expand Down
3 changes: 2 additions & 1 deletion distributed/bokeh/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ def stop(self):
if self.server._tornado._ping_job is not None:
self.server._tornado._ping_job.stop()

# self.server.stop()
# https://github.com/bokeh/bokeh/issues/5494
if bokeh.__version__ >= '0.12.4':
self.server.stop()


def format_bytes(n):
Expand Down
2 changes: 1 addition & 1 deletion distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def del_pid_file():
except ImportError:
logger.info("Please install Bokeh to get Web UI")
except Exception as e:
logger.warn("Could not start Bokeh web UI", exc_info=True)
logger.warning("Could not start Bokeh web UI", exc_info=True)

logger.info('Local Directory: %26s', local_directory)
logger.info('-' * 47)
Expand Down
4 changes: 3 additions & 1 deletion distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
import shutil
import signal
from sys import exit
from time import sleep

Expand All @@ -28,7 +29,6 @@

global_nannies = []

import signal

def handle_signal(sig, frame):
loop = IOLoop.instance()
Expand All @@ -40,6 +40,7 @@ def handle_signal(sig, frame):
if loop._running:
loop.add_callback_from_signal(loop.stop)
else:
loop.close()
exit(1)


Expand Down Expand Up @@ -237,6 +238,7 @@ def f():
io_loop=loop2)

loop2.run_sync(f)
loop2.close()

if nanny:
for n in nannies:
Expand Down
6 changes: 3 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def _handle_report(self):
msgs = yield self.scheduler_comm.comm.read()
except CommClosedError:
if self.status == 'running':
logger.warn("Client report stream closed to scheduler")
logger.warning("Client report stream closed to scheduler")
logger.info("Reconnecting...")
self.status = 'connecting'
yield self._reconnect()
Expand Down Expand Up @@ -696,7 +696,7 @@ def _handle_restart(self):
self._restart_event.set()

def _handle_error(self, exception=None):
logger.warn("Scheduler exception:")
logger.warning("Scheduler exception:")
logger.exception(exception)

@gen.coroutine
Expand Down Expand Up @@ -1032,7 +1032,7 @@ def wait(k):
response = yield self.scheduler.gather(keys=keys)

if response['status'] == 'error':
logger.warn("Couldn't gather keys %s", response['keys'])
logger.warning("Couldn't gather keys %s", response['keys'])
for key in response['keys']:
self._send_to_scheduler({'op': 'report-key',
'key': key})
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/inproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def __init__(self, peer_addr, read_q, write_q, write_loop,
def _get_finalizer(self):
def finalize(write_q=self._write_q, write_loop=self._write_loop,
r=repr(self)):
logger.warn("Closing dangling queue in %s" % (r,))
logger.warning("Closing dangling queue in %s" % (r,))
write_loop.add_callback(write_q.put_nowait, _EOF)

return finalize
Expand Down
10 changes: 5 additions & 5 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def set_tcp_timeout(stream):
TCP_USER_TIMEOUT = 18 # since Linux 2.6.37
sock.setsockopt(socket.SOL_TCP, TCP_USER_TIMEOUT, timeout * 1000)
except EnvironmentError as e:
logger.warn("Could not set timeout on TCP stream: %s", e)
logger.warning("Could not set timeout on TCP stream: %s", e)


def convert_stream_closed_error(exc):
Expand Down Expand Up @@ -129,7 +129,7 @@ def _read_extra(self):
def _get_finalizer(self):
def finalize(stream=self.stream, r=repr(self)):
if not stream.closed():
logger.warn("Closing dangling stream in %s" % (r,))
logger.warning("Closing dangling stream in %s" % (r,))
stream.close()

return finalize
Expand Down Expand Up @@ -392,9 +392,9 @@ def handle_stream(self, stream, address):
yield stream.wait_for_handshake()
except EnvironmentError as e:
# The handshake went wrong, log and ignore
logger.warn("listener on %r: TLS handshake failed with remote %r: %s",
self.listen_address, address,
getattr(e, "real_error", None) or e)
logger.warning("listener on %r: TLS handshake failed with remote %r: %s",
self.listen_address, address,
getattr(e, "real_error", None) or e)
else:
comm = TLS(stream, address, self.deserialize)
self.comm_handler(comm)
Expand Down
2 changes: 2 additions & 0 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ def run():
main_loop.add_callback(fut.set_exc_info, sys.exc_info())
else:
main_loop.add_callback(fut.set_result, res)
finally:
thread_loop.close()

t = threading.Thread(target=run)
t.start()
Expand Down
5 changes: 5 additions & 0 deletions distributed/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from Queue import Queue, Empty
from io import BytesIO
from thread import get_ident as get_thread_identity
from inspect import getargspec
from cgi import escape as html_escape

reload = reload # flake8: noqa
unicode = unicode # flake8: noqa
PY2 = True
Expand Down Expand Up @@ -51,6 +54,8 @@ def cache_from_source(path):
from threading import get_ident as get_thread_identity
from importlib import invalidate_caches
from importlib.util import cache_from_source
from inspect import getfullargspec as getargspec
from html import escape as html_escape

PY2 = False
PY3 = True
Expand Down
5 changes: 3 additions & 2 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ def determine_config_file():
try:
ensure_config_file(default_path, path)
except EnvironmentError as e:
logger.warn("Could not write default config file to '%s'. Received error %s",
path, e)
warnings.warn("Could not write default config file to '%s'. "
"Received error %s" % (path, e),
UserWarning)

return path if os.path.exists(path) else default_path

Expand Down
12 changes: 6 additions & 6 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,15 @@ def handle_comm(self, comm, shutting_down=shutting_down):
handler = self.handlers[op]
except KeyError:
result = "No handler found: %s" % op
logger.warn(result, exc_info=True)
logger.warning(result, exc_info=True)
else:
logger.debug("Calling into handler %s", handler.__name__)
try:
result = handler(comm, **msg)
if type(result) is gen.Future:
result = yield result
except CommClosedError as e:
logger.warn("Lost connection to %r: %s", address, e)
logger.warning("Lost connection to %r: %s", address, e)
break
except Exception as e:
logger.exception(e)
Expand All @@ -258,8 +258,8 @@ def handle_comm(self, comm, shutting_down=shutting_down):
try:
yield comm.write(result)
except EnvironmentError as e:
logger.warn("Lost connection to %r while sending result for op %r: %s",
address, op, e)
logger.warning("Lost connection to %r while sending result for op %r: %s",
address, op, e)
break
msg = result = None
if close_desired:
Expand Down Expand Up @@ -438,8 +438,8 @@ def __del__(self):
self.status = 'closed'
still_open = [comm for comm in self.comms if not comm.closed()]
if still_open:
logger.warn("rpc object %s deleted with %d open comms",
self, len(still_open))
logger.warning("rpc object %s deleted with %d open comms",
self, len(still_open))
for comm in still_open:
comm.abort()

Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, n_workers=None, threads_per_worker=None, processes=True,
silence_logs=logging.CRITICAL, diagnostics_port=8787,
services={}, worker_services={}, nanny=None, **worker_kwargs):
if nanny is not None:
warnings.warn("nanny has been deprecated, used processes=")
warnings.warning("nanny has been deprecated, used processes=")
processes = nanny
self.status = None
self.processes = processes
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def test_bokeh(loop):
assert time() < start + 20
sleep(0.01)

with pytest.raises(requests.ReadTimeout):
with pytest.raises(requests.RequestException):
requests.get(url, timeout=0.2)


Expand Down
4 changes: 2 additions & 2 deletions distributed/diagnostics/progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from .progress import format_time, Progress, MultiProgress

from ..compatibility import html_escape
from ..core import connect, coerce_to_address, CommClosedError
from ..client import default_client, futures_of
from ..protocol.pickle import dumps
Expand Down Expand Up @@ -227,13 +228,12 @@ def __init__(self, keys, scheduler=None, minimum=0, interval=0.1, func=key_split

def make_widget(self, all):
from ipywidgets import FloatProgress, HBox, VBox, HTML
import cgi
self.elapsed_time = HTML('')
self.bars = {key: FloatProgress(min=0, max=1, description='',
height='10px')
for key in all}
self.bar_texts = {key: HTML('', width = "140px") for key in all}
self.bar_labels = {key: HTML('<div style=\"padding: 0px 10px 0px 10px; text-align:left; word-wrap: break-word;\">' + cgi.escape(key.decode() if isinstance(key, bytes) else key) + '</div>')
self.bar_labels = {key: HTML('<div style=\"padding: 0px 10px 0px 10px; text-align:left; word-wrap: break-word;\">' + html_escape(key.decode() if isinstance(key, bytes) else key) + '</div>')
for key in all}

def key(kv):
Expand Down
2 changes: 1 addition & 1 deletion distributed/http/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def resource_collect(pid=None):
return {'cpu_percent': psutil.cpu_percent(),
'status': p.status(),
'memory_percent': p.memory_percent(),
'memory_info_ex': p.memory_info_ex(),
'memory_info': p.memory_info(),
'disk_io_counters': metrics.disk_io_counters(),
'net_io_counters': metrics.net_io_counters()}

Expand Down
10 changes: 5 additions & 5 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ def _kill(self, comm=None, timeout=10):
self.worker_address)
except allowed_errors as e:
# Maybe the scheduler is gone, or it is unresponsive
logger.warn("Nanny %r failed to unregister worker %r: %s",
self.address, self.worker_address, e)
logger.warning("Nanny %r failed to unregister worker %r: %s",
self.address, self.worker_address, e)
except Exception as e:
logger.exception(e)

Expand Down Expand Up @@ -283,7 +283,7 @@ def _watch(self, wait_seconds=0.20):
yield self._close()
break
elif self.should_watch and self.process and not isalive(self.process):
logger.warn("Discovered failed worker")
logger.warning("Discovered failed worker")
self.cleanup()
try:
yield self.scheduler.unregister(address=self.worker_address)
Expand All @@ -294,7 +294,7 @@ def _watch(self, wait_seconds=0.20):
yield self._close()
break
if self.status != 'closed':
logger.warn('Restarting worker...')
logger.warning('Restarting worker...')
yield self.instantiate()
else:
yield gen.sleep(wait_seconds)
Expand Down Expand Up @@ -323,7 +323,7 @@ def resource_collect(self):
'cpu_percent': psutil.cpu_percent(),
'status': p.status(),
'memory_percent': p.memory_percent(),
'memory_info_ex': p.memory_info_ex()._asdict(),
'memory_info': p.memory_info()._asdict(),
'disk_io_counters': disk_io_counters()._asdict(),
'net_io_counters': net_io_counters()._asdict()}

Expand Down
10 changes: 5 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ def handle_client(self, comm, client=None):
logger.exception(e)
raise
else:
logger.warn("Bad message: op=%s, %s", op, msg, exc_info=True)
logger.warning("Bad message: op=%s, %s", op, msg, exc_info=True)

if op == 'close':
breakout = True
Expand Down Expand Up @@ -1727,8 +1727,8 @@ def replicate(self, comm=None, keys=None, n=None, workers=None,
if v['status'] == 'OK':
self.add_keys(worker=w, keys=list(gathers[w]))
else:
logger.warn("Communication failed during replication: %s",
v)
logger.warning("Communication failed during replication: %s",
v)

self.log_event(w, {'action': 'replicate-add',
'keys': gathers[w]})
Expand Down Expand Up @@ -1789,8 +1789,8 @@ def workers_to_close(self, memory_ratio=2):
def retire_workers(self, comm=None, workers=None, remove=True, close=False,
close_workers=False):
if close:
logger.warn("The keyword close= has been deprecated. "
"Use close_workers= instead")
logger.warning("The keyword close= has been deprecated. "
"Use close_workers= instead")
close_workers = close_workers or close
with log_errors():
if workers is None:
Expand Down
27 changes: 19 additions & 8 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import threading
import warnings

from .compatibility import cache_from_source, invalidate_caches, reload
from .compatibility import cache_from_source, getargspec, invalidate_caches, reload

try:
import resource
Expand Down Expand Up @@ -72,7 +72,7 @@ def has_arg(func, argname):
"""
while True:
try:
if argname in inspect.getargspec(func).args:
if argname in getargspec(func).args:
return True
except TypeError:
break
Expand Down Expand Up @@ -432,12 +432,23 @@ def truncate_exception(e, n=10000):
return e


def queue_to_iterator(q):
while True:
result = q.get()
if isinstance(result, StopIteration):
raise result
yield result
if sys.version_info >= (3,):
# (re-)raising StopIteration is deprecated in 3.6+
exec("""def queue_to_iterator(q):
while True:
result = q.get()
if isinstance(result, StopIteration):
return result.value
yield result
""")
else:
# Returning non-None from generator is a syntax error in 2.x
def queue_to_iterator(q):
while True:
result = q.get()
if isinstance(result, StopIteration):
raise result
yield result


def _dump_to_queue(seq, q):
Expand Down
Loading