Skip to content

Commit 28b51c6

Browse files
committed
improve testing fixtures
1 parent a864e91 commit 28b51c6

File tree

2 files changed

+36
-189
lines changed

2 files changed

+36
-189
lines changed

components/backends/vllm/src/dynamo/tests/test_ports.py

Lines changed: 17 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
DynamoPortRange,
1414
EtcdContext,
1515
PortAllocationRequest,
16-
PortBinding,
1716
PortMetadata,
1817
allocate_and_reserve_port,
1918
allocate_and_reserve_port_block,
@@ -50,19 +49,6 @@ def test_invalid_port_range_min_greater_than_max(self):
5049
DynamoPortRange(min=3000, max=3000)
5150

5251

53-
class TestEtcdContext:
54-
"""Test ETCD context functionality."""
55-
56-
def test_make_port_key(self):
57-
"""Test port key generation."""
58-
mock_client = Mock()
59-
context = EtcdContext(client=mock_client, namespace="test-ns")
60-
61-
with patch("socket.gethostname", return_value="test-host"):
62-
key = context.make_port_key(8080)
63-
assert key == "dyn://test-ns/ports/test-host/8080"
64-
65-
6652
class TestPortMetadata:
6753
"""Test port metadata functionality."""
6854

@@ -80,77 +66,6 @@ def test_to_etcd_value_with_block_info(self):
8066
assert value["block_start"] == 8080
8167

8268

83-
class TestPortBinding:
84-
"""Test PortBinding context manager."""
85-
86-
def test_single_port_binding(self):
87-
"""Test binding a single port."""
88-
# Find an available port
89-
with socket.socket() as s:
90-
s.bind(("", 0))
91-
port = s.getsockname()[1]
92-
93-
binding = PortBinding(port)
94-
assert binding.ports == [port]
95-
96-
# Test context manager
97-
with binding:
98-
# Port should be bound
99-
assert len(binding.sockets) == 1
100-
# Trying to bind again should fail
101-
with pytest.raises(OSError):
102-
with socket.socket() as s:
103-
s.bind(("", port))
104-
105-
# After context, sockets should be closed
106-
# We should be able to bind again
107-
with socket.socket() as s:
108-
s.bind(("", port))
109-
110-
def test_multiple_port_binding(self):
111-
"""Test binding multiple ports."""
112-
# Find available ports
113-
ports = []
114-
for _ in range(3):
115-
with socket.socket() as s:
116-
s.bind(("", 0))
117-
ports.append(s.getsockname()[1])
118-
119-
binding = PortBinding(ports)
120-
assert binding.ports == ports
121-
122-
with binding:
123-
assert len(binding.sockets) == 3
124-
# All ports should be bound
125-
for port in ports:
126-
with pytest.raises(OSError):
127-
with socket.socket() as s:
128-
s.bind(("", port))
129-
130-
def test_partial_binding_failure(self):
131-
"""Test cleanup when only some ports can be bound."""
132-
# Bind a port that we'll keep occupied
133-
blocker = socket.socket()
134-
blocker.bind(("", 0))
135-
blocked_port = blocker.getsockname()[1]
136-
137-
# Find another available port
138-
with socket.socket() as s:
139-
s.bind(("", 0))
140-
good_port = s.getsockname()[1]
141-
142-
# Try to bind both ports (one will fail)
143-
binding = PortBinding([good_port, blocked_port])
144-
145-
with pytest.raises(OSError):
146-
binding.__enter__()
147-
148-
# All sockets should be cleaned up
149-
assert all(sock._closed for sock in binding.sockets if hasattr(sock, "_closed"))
150-
151-
blocker.close()
152-
153-
15469
class TestHoldPorts:
15570
"""Test hold_ports context manager."""
15671

@@ -160,9 +75,11 @@ def test_hold_single_port(self):
16075
s.bind(("", 0))
16176
port = s.getsockname()[1]
16277

163-
with hold_ports(port) as binding:
164-
assert isinstance(binding, PortBinding)
165-
assert len(binding.sockets) == 1
78+
with hold_ports(port):
79+
assert not check_port_available(port)
80+
81+
# Port should be released after context exit
82+
assert check_port_available(port)
16683

16784
def test_hold_multiple_ports(self):
16885
"""Test holding multiple ports."""
@@ -172,31 +89,13 @@ def test_hold_multiple_ports(self):
17289
s.bind(("", 0))
17390
ports.append(s.getsockname()[1])
17491

175-
with hold_ports(ports) as binding:
176-
assert isinstance(binding, PortBinding)
177-
assert len(binding.sockets) == 2
178-
179-
180-
class TestCheckPortAvailable:
181-
"""Test check_port_available function."""
182-
183-
def test_port_is_available(self):
184-
"""Test checking an available port."""
185-
# Find an available port
186-
with socket.socket() as s:
187-
s.bind(("", 0))
188-
port = s.getsockname()[1]
189-
190-
assert check_port_available(port) is True
191-
192-
def test_port_is_not_available(self):
193-
"""Test checking an occupied port."""
194-
# Occupy a port
195-
with socket.socket() as blocker:
196-
blocker.bind(("", 0))
197-
port = blocker.getsockname()[1]
92+
with hold_ports(ports):
93+
for port in ports:
94+
assert not check_port_available(port)
19895

199-
assert check_port_available(port) is False
96+
# All ports should be released after context exit
97+
for port in ports:
98+
assert check_port_available(port)
20099

201100

202101
class TestReservePortInEtcd:
@@ -206,20 +105,18 @@ class TestReservePortInEtcd:
206105
async def test_reserve_port_success(self):
207106
"""Test successful port reservation in ETCD."""
208107
mock_client = AsyncMock()
209-
mock_client.primary_lease_id = Mock(
210-
return_value="test-lease-123"
211-
) # Regular Mock, not async
108+
mock_client.primary_lease_id = Mock(return_value="test-lease-123")
212109

213110
context = EtcdContext(client=mock_client, namespace="test-ns")
214111
metadata = PortMetadata(worker_id="test-worker", reason="test")
215112

216-
with patch("socket.gethostname", return_value="test-host"):
217-
await reserve_port_in_etcd(context, 8080, metadata)
113+
host_ip = get_host_ip()
114+
await reserve_port_in_etcd(context, 8080, metadata)
218115

219116
mock_client.kv_create.assert_called_once()
220117
call_args = mock_client.kv_create.call_args
221118

222-
assert call_args.kwargs["key"] == "dyn://test-ns/ports/test-host/8080"
119+
assert call_args.kwargs["key"] == f"dyn://test-ns/ports/{host_ip}/8080"
223120
assert call_args.kwargs["lease_id"] == "test-lease-123"
224121

225122
# Check the value is valid JSON
@@ -236,7 +133,7 @@ class TestAllocateAndReservePort:
236133
async def test_allocate_single_port_success(self):
237134
"""Test successful single port allocation."""
238135
mock_client = AsyncMock()
239-
mock_client.primary_lease_id.return_value = "test-lease"
136+
mock_client.primary_lease_id = Mock(return_value="test-lease")
240137

241138
context = EtcdContext(client=mock_client, namespace="test-ns")
242139
metadata = PortMetadata(worker_id="test-worker", reason="test")
@@ -264,7 +161,7 @@ class TestAllocateAndReservePortBlock:
264161
async def test_allocate_block_success(self):
265162
"""Test successful port block allocation."""
266163
mock_client = AsyncMock()
267-
mock_client.primary_lease_id.return_value = "test-lease"
164+
mock_client.primary_lease_id = Mock(return_value="test-lease")
268165

269166
context = EtcdContext(client=mock_client, namespace="test-ns")
270167
metadata = PortMetadata(worker_id="test-worker", reason="test")

components/backends/vllm/src/dynamo/vllm/ports.py

Lines changed: 19 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,6 @@
1919
DEFAULT_DYNAMO_PORT_MIN = 20000
2020
DEFAULT_DYNAMO_PORT_MAX = 30000
2121

22-
__all__ = [
23-
# Constants
24-
"DEFAULT_DYNAMO_PORT_MIN",
25-
"DEFAULT_DYNAMO_PORT_MAX",
26-
# Classes
27-
"DynamoPortRange",
28-
"EtcdContext",
29-
"PortMetadata",
30-
"PortAllocationRequest",
31-
"PortBinding",
32-
# Functions
33-
"check_port_available",
34-
"reserve_port_in_etcd",
35-
"allocate_and_reserve_port_block",
36-
"allocate_and_reserve_port",
37-
"get_host_ip",
38-
# Context managers
39-
"hold_ports",
40-
]
41-
4222

4323
@dataclass
4424
class DynamoPortRange:
@@ -67,8 +47,8 @@ class EtcdContext:
6747

6848
def make_port_key(self, port: int) -> str:
6949
"""Generate ETCD key for a port reservation"""
70-
node_name = socket.gethostname()
71-
return f"dyn://{self.namespace}/ports/{node_name}/{port}"
50+
node_ip = get_host_ip()
51+
return f"dyn://{self.namespace}/ports/{node_ip}/{port}"
7252

7353

7454
@dataclass
@@ -103,64 +83,38 @@ class PortAllocationRequest:
10383
max_attempts: int = 100
10484

10585

106-
class PortBinding:
107-
"""Holds socket bindings to ensure exclusive access to ports during reservation.
108-
109-
Can handle a single port or multiple ports.
110-
"""
111-
112-
def __init__(self, ports: int | list[int]):
113-
"""Initialize with a single port or list of ports."""
114-
if isinstance(ports, int):
115-
self.ports = [ports]
116-
else:
117-
self.ports = ports
118-
self.sockets = []
119-
120-
def __enter__(self):
121-
"""Bind to all ports."""
122-
try:
123-
for port in self.ports:
124-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
125-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
126-
sock.bind(("", port))
127-
self.sockets.append(sock)
128-
return self
129-
except OSError:
130-
# Clean up any sockets we managed to bind
131-
for sock in self.sockets:
132-
sock.close()
133-
raise
134-
135-
def __exit__(self, exc_type, exc_val, exc_tb):
136-
"""Release all port bindings."""
137-
for sock in self.sockets:
138-
sock.close()
139-
140-
14186
@contextmanager
14287
def hold_ports(ports: int | list[int]):
14388
"""Context manager to hold port binding(s).
14489
90+
Holds socket bindings to ensure exclusive access to ports during reservation.
91+
Can handle a single port or multiple ports.
92+
14593
Args:
14694
ports: Single port number or list of port numbers to hold
147-
148-
Yields:
149-
PortBinding: The binding object holding the port(s)
15095
"""
151-
binding = PortBinding(ports)
96+
if isinstance(ports, int):
97+
ports = [ports]
98+
99+
sockets = []
152100
try:
153-
binding.__enter__()
154-
yield binding
101+
for port in ports:
102+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
104+
sock.bind(("", port))
105+
sockets.append(sock)
106+
107+
yield
108+
155109
finally:
156-
binding.__exit__(None, None, None)
110+
for sock in sockets:
111+
sock.close()
157112

158113

159114
def check_port_available(port: int) -> bool:
160115
"""Check if a specific port is available for binding."""
161116
try:
162117
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
163-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
164118
sock.bind(("", port))
165119
return True
166120
except OSError:
@@ -211,7 +165,6 @@ async def allocate_and_reserve_port_block(request: PortAllocationRequest) -> lis
211165
available_start_ports = list(range(request.port_range.min, max_start_port + 1))
212166
random.shuffle(available_start_ports)
213167

214-
# Limit attempts to the number of valid starting ports or max_attempts
215168
actual_max_attempts = min(len(available_start_ports), request.max_attempts)
216169

217170
for attempt in range(1, actual_max_attempts + 1):
@@ -227,7 +180,6 @@ async def allocate_and_reserve_port_block(request: PortAllocationRequest) -> lis
227180

228181
# We have exclusive access to these ports, now reserve them in ETCD
229182
for i, port in enumerate(ports_to_reserve):
230-
# Create metadata for this specific port in the block
231183
port_metadata = PortMetadata(
232184
worker_id=f"{request.metadata.worker_id}-{i}"
233185
if request.block_size > 1
@@ -255,12 +207,10 @@ async def allocate_and_reserve_port_block(request: PortAllocationRequest) -> lis
255207
return ports_to_reserve
256208

257209
except OSError as e:
258-
# Failed to bind to ports, they're in use
259210
logger.debug(
260211
f"Failed to bind to port block starting at {start_port} (attempt {attempt}): {e}"
261212
)
262213
except Exception as e:
263-
# Failed to reserve in ETCD (but we had the ports bound)
264214
logger.debug(
265215
f"Failed to reserve port block starting at {start_port} in ETCD (attempt {attempt}): {e}"
266216
)

0 commit comments

Comments
 (0)