Skip to content

Commit b33a651

Browse files
authored
Fix BrokerConnection.connection_delay() to return milliseconds (dpkp#1414)
1 parent 4c383da commit b33a651

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

kafka/conn.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -594,9 +594,16 @@ def blacked_out(self):
594594
return False
595595

596596
def connection_delay(self):
597-
time_waited_ms = time.time() - (self.last_attempt or 0)
597+
"""
598+
Return the number of milliseconds to wait, based on the connection
599+
state, before attempting to send data. When disconnected, this respects
600+
the reconnect backoff time. When connecting, returns 0 to allow
601+
non-blocking connect to finish. When connected, returns a very large
602+
number to handle slow/stalled connections.
603+
"""
604+
time_waited = time.time() - (self.last_attempt or 0)
598605
if self.state is ConnectionStates.DISCONNECTED:
599-
return max(self._reconnect_backoff - time_waited_ms, 0)
606+
return max(self._reconnect_backoff - time_waited, 0) * 1000
600607
elif self.connecting():
601608
return 0
602609
else:

test/test_conn.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ def test_blacked_out(conn):
7272
assert conn.blacked_out() is True
7373

7474

75+
def test_connection_delay(conn):
76+
conn.last_attempt = time.time()
77+
assert round(conn.connection_delay()) == round(conn.config['reconnect_backoff_ms'])
78+
conn.state = ConnectionStates.CONNECTING
79+
assert conn.connection_delay() == 0
80+
conn.state = ConnectionStates.CONNECTED
81+
assert conn.connection_delay() == float('inf')
82+
83+
7584
def test_connected(conn):
7685
assert conn.connected() is False
7786
conn.state = ConnectionStates.CONNECTED

0 commit comments

Comments
 (0)