Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 50 additions & 81 deletions tensorrt_llm/_torch/pyexecutor/py_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,50 @@ def _executor_loop_pp(self):
self.active_requests,
previous_batch)

def _prepare_and_schedule_batch(self):
new_requests = self._fetch_new_requests()
if self.should_stop_processing:
return None, None

if self.kv_cache_transceiver:
self._check_disagg_gen_transfer_status()

iter_stats = None
if self.enable_iter_perf_stats:
iter_stats = self._get_init_iter_stats(
len(new_requests),
self.executor_request_queue.
get_new_active_requests_queue_latency())

self._pad_attention_dp_dummy_request()

if self.drafter is not None:
self._prepare_draft_requests(self.active_requests)

scheduled_batch, fitting_disagg_gen_init_requests, num_fitting_reqs = self._schedule(
)

if self.kv_cache_transceiver:
# For requests that are fitting disagg gen init, also prepare resources for KV cache manager
self._prepare_disagg_gen_init(fitting_disagg_gen_init_requests)

if num_fitting_reqs == 0 and not fitting_disagg_gen_init_requests:
logger.warning(
"num_fitting_reqs=0 and fitting_disagg_gen_init_requests is empty, may not have enough kvCache"
)
self.kv_cache_transceiver.check_context_transfer_status(1)
else:
assert scheduled_batch.batch_size > 0, (
"fail to schedule any pending request, "
"probably run out of resource.")

self.num_scheduled_requests = scheduled_batch.batch_size
logger.debug(
f'has {len(self.active_requests)} active_request, '
f'scheduled {len(scheduled_batch.context_requests)} context requests and '
f'{len(scheduled_batch.generation_requests)} generation requests')
return scheduled_batch, iter_stats

def _executor_loop(self):
torch.cuda.set_device(self.device_id)
with self._profiler() as profile_step:
Expand All @@ -810,48 +854,10 @@ def _executor_loop(self):
profile_step()
if self.enable_iter_perf_stats:
iter_start_time = time.time()
new_requests = self._fetch_new_requests()
if self.should_stop_processing:
break

if self.kv_cache_transceiver:
self._check_disagg_gen_transfer_status()

if self.enable_iter_perf_stats:
iter_stats = self._get_init_iter_stats(
len(new_requests),
self.executor_request_queue.
get_new_active_requests_queue_latency())

self._pad_attention_dp_dummy_request()

if self.drafter is not None:
self._prepare_draft_requests(self.active_requests)

scheduled_batch, fitting_disagg_gen_init_requests, num_fitting_reqs = self._schedule(
)

if self.kv_cache_transceiver:
# For requests that are fitting disagg gen init, also prepare resources for KV cache manager
self._prepare_disagg_gen_init(
fitting_disagg_gen_init_requests)
if num_fitting_reqs == 0 and not fitting_disagg_gen_init_requests:
logger.warning(
"num_fitting_reqs=0 and fitting_disagg_gen_init_requests is empty, may not have enough kvCache"
)
self.kv_cache_transceiver.check_context_transfer_status(
1)
else:
assert scheduled_batch.batch_size > 0, (
"fail to schedule any pending request, "
"probably run out of resource.")

self.num_scheduled_requests = scheduled_batch.batch_size
logger.debug(
f'has {len(self.active_requests)} active_request, '
f'scheduled {len(scheduled_batch.context_requests)} context requests and '
f'{len(scheduled_batch.generation_requests)} generation requests'
)
scheduled_batch, iter_stats = self._prepare_and_schedule_batch()
if scheduled_batch is None:
break

self._pause_requests(scheduled_batch.paused_requests)

Expand Down Expand Up @@ -954,47 +960,10 @@ def _executor_loop_overlap(self):
profile_step()
if self.enable_iter_perf_stats:
iter_start_time = time.time()
new_requests = self._fetch_new_requests()
if self.should_stop_processing:
break

if self.kv_cache_transceiver:
self._check_disagg_gen_transfer_status()

if self.enable_iter_perf_stats:
iter_stats = self._get_init_iter_stats(
len(new_requests),
self.executor_request_queue.
get_new_active_requests_queue_latency())

self._pad_attention_dp_dummy_request()

scheduled_batch, fitting_disagg_gen_init_requests, num_fitting_reqs = self._schedule(
)

if self.kv_cache_transceiver:

# For requests that are fitting disagg gen init, also prepare resources for KV cache manager
self._prepare_disagg_gen_init(
fitting_disagg_gen_init_requests)

if num_fitting_reqs == 0 and not fitting_disagg_gen_init_requests:
logger.warning(
"num_fitting_reqs=0 and fitting_disagg_gen_init_requests is empty, may not have enough kvCache"
)
self.kv_cache_transceiver.check_context_transfer_status(
1)
else:
assert scheduled_batch.batch_size > 0, (
"fail to schedule any pending request, "
"probably run out of resource.")

self.num_scheduled_requests = scheduled_batch.batch_size
logger.debug(
f'has {len(self.active_requests)} active_request, '
f'scheduled {len(scheduled_batch.context_requests)} context requests and '
f'{len(scheduled_batch.generation_requests)} generation requests'
)
scheduled_batch, iter_stats = self._prepare_and_schedule_batch()
if scheduled_batch is None:
break

self._pause_requests(scheduled_batch.paused_requests)

Expand Down