77import signal
88import socket
99import sys
10+ import zmq
1011from typing import Any , Dict , Optional , Union
1112
1213import sglang as sgl
1314import uvloop
1415from sglang .srt .server_args import ServerArgs
15- from sglang .srt .utils import get_ip
16+ from sglang .srt .utils import get_ip , get_zmq_socket
1617from utils .protocol import DisaggPreprocessedRequest
1718from utils .sgl_utils import parse_sglang_args_inc
1819
@@ -42,6 +43,9 @@ def __init__(
4243 self .component = component
4344 self .metrics_publisher = WorkerMetricsPublisher ()
4445
46+ self .zmq_context = zmq .asyncio .Context ()
47+ self .receive_metrics_from_scheduler = None
48+
4549 if server_args .disaggregation_mode != "null" :
4650 self .bootstrap_host , self .bootstrap_port = self ._get_bootstrap_info ()
4751 if decode_client is None :
@@ -56,7 +60,13 @@ def __init__(
5660 logging .info ("Request handler initialized" )
5761
5862 def setup_metrics (self ):
59- """Set up metrics publisher - call this after handler creation"""
63+ """Set up metrics publisher"""
64+ self .receive_metrics_from_scheduler = get_zmq_socket (
65+ self .zmq_context , zmq .PULL , self .engine .port_args .metrics_ipc_name , True
66+ )
67+
68+ asyncio .create_task (self ._recieve_and_publish_metrics_loop ())
69+
6070 self .metrics_publisher .publish (
6171 request_active_slots = 0 ,
6272 request_total_slots = 1024 ,
@@ -75,22 +85,24 @@ async def create_metrics_publisher_endpoint(self):
7585 logging .debug ("Creating metrics publisher endpoint" )
7686 await self .metrics_publisher .create_endpoint (self .component )
7787
78- def _update_metrics (self ):
79- """Update metrics with current engine state"""
80- # TODO: remove this once the following upstream changes are merged:
81- # • sgl-project/sglang#6721 – "Expose runtime KV-cache & request metrics"
82- logging .warning (
83- "Publishing placeholder metrics in SGLangWorker; these are NOT real engine metrics yet and will be replaced once upstream support lands."
84- )
85- self .metrics_publisher .publish (
86- request_active_slots = 1 ,
87- request_total_slots = 100 ,
88- kv_active_blocks = random .randint (0 , 500 ),
89- kv_total_blocks = 1000 ,
90- num_requests_waiting = 0 ,
91- gpu_cache_usage_perc = random .uniform (0.1 , 0.8 ),
92- gpu_prefix_cache_hit_rate = random .uniform (0.0 , 0.5 ),
93- )
88+ async def _receive_and_publish_metrics_loop (self ):
89+ """Receive metrics from SGL scheduler and publish them"""
90+ while True :
91+ try :
92+ kv_metrics = await self .receive_metrics_from_scheduler .recv_pyobj ()
93+ self .metrics_publisher .publish (
94+ request_active_slots = kv_metrics .request_active_slots ,
95+ request_total_slots = kv_metrics .request_total_slots ,
96+ kv_active_blocks = kv_metrics .kv_active_blocks ,
97+ kv_total_blocks = kv_metrics .kv_total_blocks ,
98+ num_requests_waiting = kv_metrics .num_requests_waiting ,
99+ gpu_cache_usage_perc = kv_metrics .gpu_cache_usage_perc ,
100+ gpu_prefix_cache_hit_rate = kv_metrics .gpu_prefix_cache_hit_rate ,
101+ data_parallel_rank = getattr (kv_metrics , 'data_parallel_rank' , None ),
102+ )
103+ logging .debug (f"Published metrics: { kv_metrics } " )
104+ except Exception :
105+ logging .exception ("Failed to recieve or publish metrics" )
94106
95107 def _get_bootstrap_info (self ):
96108 """Bootstrap info from tokenizer manager"""
0 commit comments