Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix DeprecationWarnings and ResourceWarnings
Recent py.test versions display Python warnings at the end of a test run,
which helps us chase those.
  • Loading branch information
pitrou committed May 23, 2017
commit 2e5f617d4626cb997614b5dc28b5920b14d4d2d5
4 changes: 2 additions & 2 deletions distributed/bokeh/background/server_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def http_get(route):
logger.info("Can not connect to %s", url, exc_info=True)
return
except HTTPError:
logger.warn("http route %s failed", route)
logger.warning("http route %s failed", route)
return
msg = json.loads(response.body.decode())
messages[route]['deque'].append(msg)
Expand All @@ -58,7 +58,7 @@ def workers():
response = yield client.fetch(
'http://%(host)s:%(http-port)d/workers.json' % options)
except HTTPError:
logger.warn("workers http route failed")
logger.warning("workers http route failed")
return
msg = json.loads(response.body.decode())
if msg:
Expand Down
2 changes: 1 addition & 1 deletion distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def del_pid_file():
except ImportError:
logger.info("Please install Bokeh to get Web UI")
except Exception as e:
logger.warn("Could not start Bokeh web UI", exc_info=True)
logger.warning("Could not start Bokeh web UI", exc_info=True)

logger.info('Local Directory: %26s', local_directory)
logger.info('-' * 47)
Expand Down
6 changes: 3 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def _handle_report(self):
msgs = yield self.scheduler_comm.comm.read()
except CommClosedError:
if self.status == 'running':
logger.warn("Client report stream closed to scheduler")
logger.warning("Client report stream closed to scheduler")
logger.info("Reconnecting...")
self.status = 'connecting'
yield self._reconnect()
Expand Down Expand Up @@ -696,7 +696,7 @@ def _handle_restart(self):
self._restart_event.set()

def _handle_error(self, exception=None):
logger.warn("Scheduler exception:")
logger.warning("Scheduler exception:")
logger.exception(exception)

@gen.coroutine
Expand Down Expand Up @@ -1032,7 +1032,7 @@ def wait(k):
response = yield self.scheduler.gather(keys=keys)

if response['status'] == 'error':
logger.warn("Couldn't gather keys %s", response['keys'])
logger.warning("Couldn't gather keys %s", response['keys'])
for key in response['keys']:
self._send_to_scheduler({'op': 'report-key',
'key': key})
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/inproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def __init__(self, peer_addr, read_q, write_q, write_loop,
def _get_finalizer(self):
def finalize(write_q=self._write_q, write_loop=self._write_loop,
r=repr(self)):
logger.warn("Closing dangling queue in %s" % (r,))
logger.warning("Closing dangling queue in %s" % (r,))
write_loop.add_callback(write_q.put_nowait, _EOF)

return finalize
Expand Down
10 changes: 5 additions & 5 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def set_tcp_timeout(stream):
TCP_USER_TIMEOUT = 18 # since Linux 2.6.37
sock.setsockopt(socket.SOL_TCP, TCP_USER_TIMEOUT, timeout * 1000)
except EnvironmentError as e:
logger.warn("Could not set timeout on TCP stream: %s", e)
logger.warning("Could not set timeout on TCP stream: %s", e)


def convert_stream_closed_error(exc):
Expand Down Expand Up @@ -129,7 +129,7 @@ def _read_extra(self):
def _get_finalizer(self):
def finalize(stream=self.stream, r=repr(self)):
if not stream.closed():
logger.warn("Closing dangling stream in %s" % (r,))
logger.warning("Closing dangling stream in %s" % (r,))
stream.close()

return finalize
Expand Down Expand Up @@ -392,9 +392,9 @@ def handle_stream(self, stream, address):
yield stream.wait_for_handshake()
except EnvironmentError as e:
# The handshake went wrong, log and ignore
logger.warn("listener on %r: TLS handshake failed with remote %r: %s",
self.listen_address, address,
getattr(e, "real_error", None) or e)
logger.warning("listener on %r: TLS handshake failed with remote %r: %s",
self.listen_address, address,
getattr(e, "real_error", None) or e)
else:
comm = TLS(stream, address, self.deserialize)
self.comm_handler(comm)
Expand Down
2 changes: 2 additions & 0 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ def run():
main_loop.add_callback(fut.set_exc_info, sys.exc_info())
else:
main_loop.add_callback(fut.set_result, res)
finally:
thread_loop.close()

t = threading.Thread(target=run)
t.start()
Expand Down
3 changes: 3 additions & 0 deletions distributed/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from Queue import Queue, Empty
from io import BytesIO
from thread import get_ident as get_thread_identity
from inspect import getargspec

reload = reload # flake8: noqa
unicode = unicode # flake8: noqa
PY2 = True
Expand Down Expand Up @@ -51,6 +53,7 @@ def cache_from_source(path):
from threading import get_ident as get_thread_identity
from importlib import invalidate_caches
from importlib.util import cache_from_source
from inspect import getfullargspec as getargspec

PY2 = False
PY3 = True
Expand Down
5 changes: 3 additions & 2 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ def determine_config_file():
try:
ensure_config_file(default_path, path)
except EnvironmentError as e:
logger.warn("Could not write default config file to '%s'. Received error %s",
path, e)
warnings.warn("Could not write default config file to '%s'. "
"Received error %s" % (path, e),
UserWarning)

return path if os.path.exists(path) else default_path

Expand Down
12 changes: 6 additions & 6 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,15 @@ def handle_comm(self, comm, shutting_down=shutting_down):
handler = self.handlers[op]
except KeyError:
result = "No handler found: %s" % op
logger.warn(result, exc_info=True)
logger.warning(result, exc_info=True)
else:
logger.debug("Calling into handler %s", handler.__name__)
try:
result = handler(comm, **msg)
if type(result) is gen.Future:
result = yield result
except CommClosedError as e:
logger.warn("Lost connection to %r: %s", address, e)
logger.warning("Lost connection to %r: %s", address, e)
break
except Exception as e:
logger.exception(e)
Expand All @@ -258,8 +258,8 @@ def handle_comm(self, comm, shutting_down=shutting_down):
try:
yield comm.write(result)
except EnvironmentError as e:
logger.warn("Lost connection to %r while sending result for op %r: %s",
address, op, e)
logger.warning("Lost connection to %r while sending result for op %r: %s",
address, op, e)
break
msg = result = None
if close_desired:
Expand Down Expand Up @@ -438,8 +438,8 @@ def __del__(self):
self.status = 'closed'
still_open = [comm for comm in self.comms if not comm.closed()]
if still_open:
logger.warn("rpc object %s deleted with %d open comms",
self, len(still_open))
logger.warning("rpc object %s deleted with %d open comms",
self, len(still_open))
for comm in still_open:
comm.abort()

Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, n_workers=None, threads_per_worker=None, processes=True,
silence_logs=logging.CRITICAL, diagnostics_port=8787,
services={}, worker_services={}, nanny=None, **worker_kwargs):
if nanny is not None:
warnings.warn("nanny has been deprecated, used processes=")
warnings.warning("nanny has been deprecated, used processes=")
processes = nanny
self.status = None
self.processes = processes
Expand Down
8 changes: 4 additions & 4 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ def _kill(self, comm=None, timeout=10):
self.worker_address)
except allowed_errors as e:
# Maybe the scheduler is gone, or it is unresponsive
logger.warn("Nanny %r failed to unregister worker %r: %s",
self.address, self.worker_address, e)
logger.warning("Nanny %r failed to unregister worker %r: %s",
self.address, self.worker_address, e)
except Exception as e:
logger.exception(e)

Expand Down Expand Up @@ -283,7 +283,7 @@ def _watch(self, wait_seconds=0.20):
yield self._close()
break
elif self.should_watch and self.process and not isalive(self.process):
logger.warn("Discovered failed worker")
logger.warning("Discovered failed worker")
self.cleanup()
try:
yield self.scheduler.unregister(address=self.worker_address)
Expand All @@ -294,7 +294,7 @@ def _watch(self, wait_seconds=0.20):
yield self._close()
break
if self.status != 'closed':
logger.warn('Restarting worker...')
logger.warning('Restarting worker...')
yield self.instantiate()
else:
yield gen.sleep(wait_seconds)
Expand Down
10 changes: 5 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ def handle_client(self, comm, client=None):
logger.exception(e)
raise
else:
logger.warn("Bad message: op=%s, %s", op, msg, exc_info=True)
logger.warning("Bad message: op=%s, %s", op, msg, exc_info=True)

if op == 'close':
breakout = True
Expand Down Expand Up @@ -1727,8 +1727,8 @@ def replicate(self, comm=None, keys=None, n=None, workers=None,
if v['status'] == 'OK':
self.add_keys(worker=w, keys=list(gathers[w]))
else:
logger.warn("Communication failed during replication: %s",
v)
logger.warning("Communication failed during replication: %s",
v)

self.log_event(w, {'action': 'replicate-add',
'keys': gathers[w]})
Expand Down Expand Up @@ -1789,8 +1789,8 @@ def workers_to_close(self, memory_ratio=2):
def retire_workers(self, comm=None, workers=None, remove=True, close=False,
close_workers=False):
if close:
logger.warn("The keyword close= has been deprecated. "
"Use close_workers= instead")
logger.warning("The keyword close= has been deprecated. "
"Use close_workers= instead")
close_workers = close_workers or close
with log_errors():
if workers is None:
Expand Down
27 changes: 19 additions & 8 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import threading
import warnings

from .compatibility import cache_from_source, invalidate_caches, reload
from .compatibility import cache_from_source, getargspec, invalidate_caches, reload

try:
import resource
Expand Down Expand Up @@ -72,7 +72,7 @@ def has_arg(func, argname):
"""
while True:
try:
if argname in inspect.getargspec(func).args:
if argname in getargspec(func).args:
return True
except TypeError:
break
Expand Down Expand Up @@ -432,12 +432,23 @@ def truncate_exception(e, n=10000):
return e


def queue_to_iterator(q):
while True:
result = q.get()
if isinstance(result, StopIteration):
raise result
yield result
if sys.version_info >= (3,):
# (re-)raising StopIteration is deprecated in 3.6+
exec("""def queue_to_iterator(q):
while True:
result = q.get()
if isinstance(result, StopIteration):
return result.value
yield result
""")
else:
# Returning non-None from generator is a syntax error in 2.x
def queue_to_iterator(q):
while True:
result = q.get()
if isinstance(result, StopIteration):
raise result
yield result


def _dump_to_queue(seq, q):
Expand Down
15 changes: 6 additions & 9 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,16 +608,13 @@ def popen(*args, **kwargs):
terminate_process(proc)
finally:
# XXX Also dump stdout if return code != 0 ?
out, err = proc.communicate()
if dump_stdout:
line = '\n\nPrint from stderr\n=================\n'
while line:
print(line, end='')
line = proc.stderr.readline()

line = '\n\nPrint from stdout\n=================\n'
while line:
print(line, end='')
line = proc.stdout.readline()
print('\n\nPrint from stderr\n=================\n')
print(err)

print('\n\nPrint from stdout\n=================\n')
print(out)


def wait_for_port(address, timeout=5):
Expand Down
12 changes: 6 additions & 6 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import psutil
TOTAL_MEMORY = psutil.virtual_memory().total
except ImportError:
logger.warn("Please install psutil to estimate worker memory use")
logger.warning("Please install psutil to estimate worker memory use")
TOTAL_MEMORY = 8e9
psutil = None

Expand Down Expand Up @@ -551,8 +551,8 @@ def gather(self, comm=None, who_has=None):
result, missing_keys, missing_workers = yield gather_from_workers(
who_has, rpc=self.rpc)
if missing_keys:
logger.warn("Could not find data: %s on workers: %s (who_has: %s)",
missing_keys, missing_workers, who_has)
logger.warning("Could not find data: %s on workers: %s (who_has: %s)",
missing_keys, missing_workers, who_has)
raise Return({'status': 'missing-data',
'keys': missing_keys})
else:
Expand Down Expand Up @@ -753,7 +753,7 @@ def run(server, comm, function, args=(), kwargs={}, is_coro=False, wait=True):
if is_coro:
result = (yield result) if wait else None
except Exception as e:
logger.warn(" Run Failed\n"
logger.warning(" Run Failed\n"
"Function: %s\n"
"args: %s\n"
"kwargs: %s\n",
Expand Down Expand Up @@ -1113,7 +1113,7 @@ def add_task(self, key, function=None, args=None, kwargs=None, task=None,
if stop - start > 0.010:
self.startstops[key].append(('deserialize', start, stop))
except Exception as e:
logger.warn("Could not deserialize task", exc_info=True)
logger.warning("Could not deserialize task", exc_info=True)
emsg = error_message(e)
emsg['key'] = key
emsg['op'] = 'task-erred'
Expand Down Expand Up @@ -1891,7 +1891,7 @@ def execute(self, key, report=False):
else:
self.exceptions[key] = result['exception']
self.tracebacks[key] = result['traceback']
logger.warn(" Compute Failed\n"
logger.warning(" Compute Failed\n"
"Function: %s\n"
"args: %s\n"
"kwargs: %s\n"
Expand Down