Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
hostname: localhost
port: 8000
model: DeepSeek-V3-Lite/bf16
backend: "pytorch"
use_cuda_graph: False
disable_overlap_scheduler: True
autotuner_enabled: False
context_servers:
num_instances: 2
router:
type: kv_cache_aware
tensor_parallel_size: 1
pipeline_parallel_size: 1
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.1
urls:
- "localhost:8001"
- "localhost:8002"
generation_servers:
num_instances: 2
router:
type: kv_cache_aware
tensor_parallel_size: 1
pipeline_parallel_size: 1
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.1
urls:
- "localhost:8003"
- "localhost:8004"
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
hostname: localhost
port: 8000
model: DeepSeek-V3-Lite/bf16
free_gpu_memory_fraction: 0.15
backend: "pytorch"
use_cuda_graph: False
disable_overlap_scheduler: True
autotuner_enabled: False
context_servers:
num_instances: 1
tensor_parallel_size: 1
pipeline_parallel_size: 1
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
urls:
- "localhost:8001"
generation_servers:
num_instances: 1
tensor_parallel_size: 1
pipeline_parallel_size: 1
router:
type: kv_cache_aware
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.05
urls:
- "localhost:8002"
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
hostname: localhost
port: 8000
model: DeepSeek-V3-Lite/bf16
backend: "pytorch"
free_gpu_memory_fraction: 0.15
conditional_disagg_config:
max_local_prefill_length: 100
use_cuda_graph: False
disable_overlap_scheduler: True
autotuner_enabled: False
context_servers:
num_instances: 1
tensor_parallel_size: 1
pipeline_parallel_size: 1
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.15
urls:
- "localhost:8001"
generation_servers:
num_instances: 1
tensor_parallel_size: 1
pipeline_parallel_size: 1
router:
type: kv_cache_aware
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.15
urls:
- "localhost:8002"
48 changes: 48 additions & 0 deletions tests/integration/defs/disaggregated/test_disaggregated.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ def get_test_config(test_desc, example_dir, test_root):
(2,
f"{test_configs_root}/disagg_config_ctxtp1_gentp1_deepseek_v3_lite_one_mtp_attention_dp_overlap.yaml"
),
"deepseek_v3_lite_bf16_cache_aware_balance":
(4,
f"{test_configs_root}/disagg_config_cache_aware_balance_deepseek_v3.yaml"
),
"deepseek_v3_lite_bf16_conditional":
(2, f"{test_configs_root}/disagg_config_conditional_deepseek_v3.yaml"),
}

if test_desc not in config_map:
Expand Down Expand Up @@ -757,3 +763,45 @@ def test_disaggregated_deepseek_v3_lite_fp8_tp1_attention_dp_overlap_one_mtp(
"deepseek_v3_lite_fp8_tp1_attention_dp_overlap_one_mtp",
env=llm_venv._new_env,
cwd=llm_venv.get_working_directory())


@skip_no_hopper
@pytest.mark.parametrize("deepseek_v3_model_root", ['DeepSeek-V3-Lite-bf16'],
indirect=True)
def test_disaggregated_deepseek_v3_lite_bf16_cache_aware_balance(
disaggregated_test_root, disaggregated_example_root, llm_venv,
deepseek_v3_model_root):
src_dst_dict = {
deepseek_v3_model_root:
f"{llm_venv.get_working_directory()}/DeepSeek-V3-Lite/bf16",
}
for src, dst in src_dst_dict.items():
if not os.path.islink(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
os.symlink(src, dst, target_is_directory=True)

run_disaggregated_test(disaggregated_example_root,
"deepseek_v3_lite_bf16_cache_aware_balance",
env=llm_venv._new_env,
cwd=llm_venv.get_working_directory())


@skip_no_hopper
@pytest.mark.parametrize("deepseek_v3_model_root", ['DeepSeek-V3-Lite-bf16'],
indirect=True)
def test_disaggregated_deepseek_v3_lite_bf16_conditional(
disaggregated_test_root, disaggregated_example_root, llm_venv,
deepseek_v3_model_root):
src_dst_dict = {
deepseek_v3_model_root:
f"{llm_venv.get_working_directory()}/DeepSeek-V3-Lite/bf16",
}
for src, dst in src_dst_dict.items():
if not os.path.islink(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
os.symlink(src, dst, target_is_directory=True)

run_disaggregated_test(disaggregated_example_root,
"deepseek_v3_lite_bf16_conditional",
env=llm_venv._new_env,
cwd=llm_venv.get_working_directory())
97 changes: 81 additions & 16 deletions tests/integration/defs/disaggregated/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import aiohttp
import pytest
import yaml
from defs.conftest import skip_no_hopper
from defs.disaggregated.test_disaggregated_single_gpu import \
model_path as get_model_path
from defs.trt_test_alternative import popen
from transformers import AutoTokenizer

Expand All @@ -19,8 +22,6 @@
KvCacheAwareServerState, ServerRole,
block_key_hasher)

MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"


def get_ctx_gen_server_urls_from_cfg(config_file: str):
with open(config_file, 'r') as file:
Expand Down Expand Up @@ -184,15 +185,17 @@ def __init__(self,
ctx_servers: List[str],
gen_servers: List[str],
req_timeout_secs: int = 180,
server_start_timeout_secs: int = 180):
server_start_timeout_secs: int = 180,
model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"):
super().__init__(ctx_servers, gen_servers, req_timeout_secs,
server_start_timeout_secs)
self.model_name = model_name

async def multi_round_request(self, session: aiohttp.ClientSession,
init_prompt: str, max_rounds: int,
threshold: float):
request = {
"model": MODEL_NAME,
"model": self.model_name,
"prompt": init_prompt,
"max_tokens": 10,
"ignore_eos": True,
Expand Down Expand Up @@ -235,10 +238,15 @@ def __init__(self,
ctx_servers: List[str],
gen_servers: List[str],
req_timeout_secs: int = 180,
server_start_timeout_secs: int = 240):
server_start_timeout_secs: int = 240,
model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
model_path: Optional[str] = None):
super().__init__(ctx_servers, gen_servers, req_timeout_secs,
server_start_timeout_secs)
self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
if model_path is None:
model_path = get_model_path(model_name)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model_name = model_name
self.kv_cache_block_maps: dict[str, KvCacheAwareServerState] = {}
self.kv_cache_event_maps: dict[str, list[dict]] = {}
for ctx_server in ctx_servers:
Expand Down Expand Up @@ -266,7 +274,7 @@ async def multi_round_request(self,
max_rounds: int,
check_match_count: bool = True):
request = {
"model": MODEL_NAME,
"model": self.model_name,
"prompt": init_prompt,
"max_tokens": 64,
"ignore_eos": True,
Expand Down Expand Up @@ -347,21 +355,26 @@ def __init__(self,
ctx_servers: List[str],
gen_servers: List[str],
req_timeout_secs: int = 180,
server_start_timeout_secs: int = 180):
server_start_timeout_secs: int = 180,
model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
tokens_per_block: int = 32):
super().__init__(ctx_servers, gen_servers, req_timeout_secs,
server_start_timeout_secs)
self.ctx_router = KvCacheAwareRouter(server_role=ServerRole.CONTEXT,
servers=ctx_servers)
servers=ctx_servers,
tokens_per_block=tokens_per_block)
self.gen_router = KvCacheAwareRouter(server_role=ServerRole.GENERATION,
servers=gen_servers)
servers=gen_servers,
tokens_per_block=tokens_per_block)
self.model_name = model_name

async def multi_round_request(self,
session: aiohttp.ClientSession,
init_prompt: str,
max_rounds: int = 8,
check_server_match: bool = True):
request = {
"model": MODEL_NAME,
"model": self.model_name,
"prompt": init_prompt,
"max_tokens": 64,
"ignore_eos": True,
Expand All @@ -373,7 +386,7 @@ async def multi_round_request(self,
gen_match = 0
for i in range(max_rounds):
openai_request = CompletionRequest(
model=MODEL_NAME,
model=self.model_name,
prompt=request["prompt"],
disaggregated_params=DisaggregatedParams(
request_type="context_only"))
Expand Down Expand Up @@ -425,7 +438,7 @@ async def test_eviction(self):
async with await self.new_session() as session:
# send a dummy request for initialization
dummy_request = {
"model": MODEL_NAME,
"model": self.model_name,
"prompt": [3] * 200,
"max_tokens": 1,
"ignore_eos": True,
Expand All @@ -447,7 +460,7 @@ async def test_eviction(self):
logger.info(f"Block pool size: {block_pool_size}")

# the dummy request can be reused
openai_request = CompletionRequest(model=MODEL_NAME,
openai_request = CompletionRequest(model=self.model_name,
prompt=dummy_request["prompt"])
server, info = await self.gen_router.get_next_server(openai_request)
first_match = info["matches"][0]
Expand Down Expand Up @@ -503,8 +516,7 @@ def load_default_prompts(disaggregated_example_root: str):
@contextlib.contextmanager
def background_workers(llm_venv, config_file: str, num_ranks: int = None):
cwd = llm_venv.get_working_directory()

with open(os.path.join(cwd, 'output_workers.log'), 'w') as log_file:
with open(os.path.join(cwd, 'output_workers.log'), 'w+') as log_file:
workers_proc, ctx_servers, gen_servers = run_disaggregated_workers(
config_file=config_file,
stdout=log_file,
Expand Down Expand Up @@ -537,6 +549,30 @@ def test_workers_conditional_disaggregation(disaggregated_test_root,
asyncio.run(tester.test_multi_round_request(prompts))


@pytest.mark.parametrize("deepseek_v3_model_root", ['DeepSeek-V3-Lite-bf16'],
indirect=True)
def test_workers_conditional_disaggregation_deepseek_v3_lite_bf16(
disaggregated_test_root, disaggregated_example_root, llm_venv,
deepseek_v3_model_root):
config_file = os.path.join(
disaggregated_test_root,
'test_configs/disagg_config_cache_reuse_deepseek_v3.yaml')
model_root = f"{llm_venv.get_working_directory()}/DeepSeek-V3-Lite/bf16"
src_dst_dict = {
deepseek_v3_model_root: model_root,
}
for src, dst in src_dst_dict.items():
if not os.path.islink(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
os.symlink(src, dst, target_is_directory=True)

with background_workers(llm_venv, config_file,
2) as (ctx_servers, gen_servers):
tester = ConditionalWorkerTester(ctx_servers, gen_servers)
prompts = load_default_prompts(disaggregated_example_root)
asyncio.run(tester.test_multi_round_request(prompts))


@pytest.mark.parametrize("llama_model_root", ['TinyLlama-1.1B-Chat-v1.0'],
indirect=True)
def test_workers_kv_cache_events(disaggregated_test_root,
Expand Down Expand Up @@ -570,6 +606,35 @@ def test_workers_kv_cache_aware_router(disaggregated_test_root,
asyncio.run(tester.test_multi_round_request(prompts, 16, 4))


@skip_no_hopper
@pytest.mark.parametrize("deepseek_v3_model_root", ['DeepSeek-V3-Lite-bf16'],
indirect=True)
def test_workers_kv_cache_aware_router_deepseek_v3_lite_bf16(
disaggregated_test_root, disaggregated_example_root, llm_venv,
deepseek_v3_model_root):
config_file = os.path.join(
disaggregated_test_root,
'test_configs/disagg_config_cache_aware_balance_deepseek_v3.yaml')
model_root = f"{llm_venv.get_working_directory()}/DeepSeek-V3-Lite/bf16"
src_dst_dict = {
deepseek_v3_model_root: model_root,
}
for src, dst in src_dst_dict.items():
if not os.path.islink(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
os.symlink(src, dst, target_is_directory=True)

with background_workers(llm_venv, config_file,
4) as (ctx_servers, gen_servers):
os.chdir(llm_venv.get_working_directory())
tester = KvCacheAwareRouterTester(ctx_servers,
gen_servers,
model_name="DeepSeek-V3-Lite/bf16",
tokens_per_block=64)
prompts = load_default_prompts(disaggregated_example_root)
asyncio.run(tester.test_multi_round_request(prompts, 8, 4))


@pytest.mark.parametrize("llama_model_root", ['TinyLlama-1.1B-Chat-v1.0'],
indirect=True)
def test_workers_kv_cache_aware_router_eviction(disaggregated_test_root,
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_lists/test-db/l0_dgx_h100.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ l0_dgx_h100:
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_fp8_tp1_attention_dp_overlap_one_mtp[DeepSeek-V3-Lite-fp8]
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_fp8_attention_dp_overlap_cuda_graph[DeepSeek-V3-Lite-fp8]
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_fp8_overlap_cuda_graph[DeepSeek-V3-Lite-fp8]
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_bf16_cache_aware_balance[DeepSeek-V3-Lite-bf16]
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_bf16_conditional[DeepSeek-V3-Lite-bf16]
- disaggregated/test_workers.py::test_workers_conditional_disaggregation_deepseek_v3_lite_bf16[DeepSeek-V3-Lite-bf16]
- disaggregated/test_workers.py::test_workers_kv_cache_aware_router_deepseek_v3_lite_bf16[DeepSeek-V3-Lite-bf16]
- condition:
ranges:
system_gpu_count:
Expand Down