From 37a625cdd52d402da18d09f850fe4f3ddb9b3d71 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 9 Oct 2025 12:29:19 -0700 Subject: [PATCH 1/7] reference --- docs/python/reference/contexts.md | 44 ------------------ docs/python/reference/queues.md | 76 ++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 45 deletions(-) diff --git a/docs/python/reference/contexts.md b/docs/python/reference/contexts.md index 5ff5e01fd..e7c131119 100644 --- a/docs/python/reference/contexts.md +++ b/docs/python/reference/contexts.md @@ -803,50 +803,6 @@ with SetWorkflowTimeout(10): example_workflow() ``` -### SetEnqueueOptions - -```python -SetEnqueueOptions( - *, - deduplication_id: Optional[str] = None, - priority: Optional[int] = None, -) -``` - -Set options for enclosed workflow enqueue operations. -These options are **not propagated** to child workflows. - -**Parameters:** - -- `deduplication_id`: At any given time, only one workflow with a specific deduplication ID can be enqueued in the specified queue. If a workflow with a deduplication ID is currently enqueued or actively executing (status `ENQUEUED` or `PENDING`), subsequent workflow enqueue attempt with the same deduplication ID in the same queue will raise a `DBOSQueueDeduplicatedError` exception. Defaults to `None`. -- `priority`: The priority of the enqueued workflow in the specified queue. Workflows with the same priority are dequeued in **FIFO (first in, first out)** order. Priority values can range from `1` to `2,147,483,647`, where **a low number indicates a higher priority**. Defaults to `None`. Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities. - - -**Deduplication Example** - -```python -with SetEnqueueOptions(deduplication_id="my_dedup_id"): - try: - handle = queue.enqueue(example_workflow, ...) - except DBOSQueueDeduplicatedError as e: - # Handle deduplication error -``` - -**Priority Example** - -```python -with SetEnqueueOptions(priority=10): - # All workflows are enqueued with priority set to 10 - # They will be dequeued in FIFO order - for task in tasks: - queue.enqueue(task_workflow, task) - -with SetEnqueueOptions(priority=1): - queue.enqueue(first_workflow) - -# first_workflow (priority=1) will be dequeued before all task_workflows (priority=10) -``` - ### DBOSContextEnsure ```python diff --git a/docs/python/reference/queues.md b/docs/python/reference/queues.md index 30bfb6207..a9616e303 100644 --- a/docs/python/reference/queues.md +++ b/docs/python/reference/queues.md @@ -15,7 +15,8 @@ Queue( limiter: Optional[Limiter] = None *, worker_concurrency: Optional[int] = None, - priority_enabled: bool = False, + priority_enabled: bool = False, + partition_queue: bool = False, ) class Limiter(TypedDict): @@ -31,6 +32,7 @@ If not provided, any number of functions may run concurrently. - `limiter`: A limit on the maximum number of functions which may be started in a given period. - `worker_concurrency`: The maximum number of functions from this queue that may run concurrently on a given DBOS process. Must be less than or equal to `concurrency`. - `priority_enabled`: Enable setting priority for workflows on this queue. +- `partition_queue`: Enable partitioning for this queue. **Example syntax:** @@ -115,3 +117,75 @@ async def process_tasks(tasks): # Return the results of all tasks. return [await handle.get_result() for handle in task_handles] ``` + + +### SetEnqueueOptions + +```python +SetEnqueueOptions( + *, + deduplication_id: Optional[str] = None, + priority: Optional[int] = None, + app_version: Optional[str] = None, + queue_partition_key: Optional[str] = None, +) +``` + +Set options for enclosed workflow enqueue operations. +These options are **not propagated** to child workflows. + +**Parameters:** + +- `deduplication_id`: At any given time, only one workflow with a specific deduplication ID can be enqueued in the specified queue. If a workflow with a deduplication ID is currently enqueued or actively executing (status `ENQUEUED` or `PENDING`), subsequent workflow enqueue attempt with the same deduplication ID in the same queue will raise a `DBOSQueueDeduplicatedError` exception. Defaults to `None`. +- `priority`: The priority of the enqueued workflow in the specified queue. Workflows with the same priority are dequeued in **FIFO (first in, first out)** order. Priority values can range from `1` to `2,147,483,647`, where **a low number indicates a higher priority**. Defaults to `None`. Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities. +- `app_version`: The application version of the workflow to enqueue. The workflow may only be dequeued by processes running that version. Defaults to the current application version. +- `queue_partition_key`: The queue partition in which to enqueue this workflow. Use if and only if the queue is partitioned (`partition_queue=True`). In partitioned queues, all flow control (including concurrency and rate limits) is applied to individual partitions instead of the queue as a whole. + + +**Deduplication Example** + +```python +queue = Queue("example_queue") + +with SetEnqueueOptions(deduplication_id="my_dedup_id"): + try: + handle = queue.enqueue(example_workflow, ...) + except DBOSQueueDeduplicatedError as e: + # Handle deduplication error +``` + +**Priority Example** + +```python +queue = Queue("example_queue", priority_enabled=True) + +with SetEnqueueOptions(priority=10): + # All workflows are enqueued with priority set to 10 + # They will be dequeued in FIFO order + for task in tasks: + queue.enqueue(task_workflow, task) + +# first_workflow (priority=1) will be dequeued before all task_workflows (priority=10) +with SetEnqueueOptions(priority=1): + queue.enqueue(first_workflow) + +``` + +**Partitioned Queue Example** + +```python +queue = Queue("queue", partition_queue=True, concurrency=1) + +@DBOS.workflow() +def process_task(task: Task): + ... + + +def on_user_task_submission(user_id: str, task: Task): + # Partition the task queue by user ID. As the queue has a + # maximum concurrency of 1, this means that at most one + # task can run at once per user (but tasks from different) + # users can run concurrently. + with SetEnqueueOptions(queue_partition_key=user_id): + queue.enqueue(process_task, task) +``` \ No newline at end of file From 24e8797a3de3fbc324dad7bd6b0508dc6d9c6dfc Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 9 Oct 2025 12:34:27 -0700 Subject: [PATCH 2/7] update --- docs/python/reference/queues.md | 10 ++++--- docs/python/tutorials/queue-tutorial.md | 36 ++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/docs/python/reference/queues.md b/docs/python/reference/queues.md index a9616e303..e8e02f687 100644 --- a/docs/python/reference/queues.md +++ b/docs/python/reference/queues.md @@ -145,12 +145,15 @@ These options are **not propagated** to child workflows. **Deduplication Example** ```python +from dbos import DBOS, Queue, SetEnqueueOptions +from dbos import error as dboserror + queue = Queue("example_queue") with SetEnqueueOptions(deduplication_id="my_dedup_id"): try: handle = queue.enqueue(example_workflow, ...) - except DBOSQueueDeduplicatedError as e: + except dboserror.DBOSQueueDeduplicatedError as e: # Handle deduplication error ``` @@ -168,7 +171,6 @@ with SetEnqueueOptions(priority=10): # first_workflow (priority=1) will be dequeued before all task_workflows (priority=10) with SetEnqueueOptions(priority=1): queue.enqueue(first_workflow) - ``` **Partitioned Queue Example** @@ -184,8 +186,8 @@ def process_task(task: Task): def on_user_task_submission(user_id: str, task: Task): # Partition the task queue by user ID. As the queue has a # maximum concurrency of 1, this means that at most one - # task can run at once per user (but tasks from different) - # users can run concurrently. + # task can run at once per user (but tasks from different + # users can run concurrently). with SetEnqueueOptions(queue_partition_key=user_id): queue.enqueue(process_task, task) ``` \ No newline at end of file diff --git a/docs/python/tutorials/queue-tutorial.md b/docs/python/tutorials/queue-tutorial.md index 22160c699..ae637d746 100644 --- a/docs/python/tutorials/queue-tutorial.md +++ b/docs/python/tutorials/queue-tutorial.md @@ -191,6 +191,8 @@ Example syntax: from dbos import DBOS, Queue, SetEnqueueOptions from dbos import error as dboserror +queue = Queue("example_queue") + with SetEnqueueOptions(deduplication_id="my_dedup_id"): try: handle = queue.enqueue(example_workflow, ...) @@ -211,7 +213,7 @@ Workflows without assigned priorities have the highest priority and are dequeued Example syntax: ```python -queue = Queue("priority_queue", priority_enabled=True) +queue = Queue("example_queue", priority_enabled=True) with SetEnqueueOptions(priority=10): # All workflows are enqueued with priority set to 10 @@ -219,8 +221,36 @@ with SetEnqueueOptions(priority=10): for task in tasks: queue.enqueue(task_workflow, task) +# first_workflow (priority=1) will be dequeued before all task_workflows (priority=10) with SetEnqueueOptions(priority=1): queue.enqueue(first_workflow) - -# first_workflow (priority=1) will be dequeued before all task_workflows (priority=10) ``` + +## Partitioning Queues + +You can **partition** queues to distribute work across dynamically created queue partitions. +When you enqueue a workflow on a partitioned queue, you must supply a queue partition key. +Partitioned queues dequeue workflows and apply flow control limits for individual partitions, not for the entire queue. +Essentially, you can think of each partition as a "virtual queue" you can create dynamically by enqueueing a workflow with a partition key. + +For example, suppose you want your users to each be able to run only one task at a time. +You can do this with a partitioned queue with a maximum concurrency limit of 1 where the partition key is user ID. + +**Example Syntax** + +```python +queue = Queue("queue", partition_queue=True, concurrency=1) + +@DBOS.workflow() +def process_task(task: Task): + ... + + +def on_user_task_submission(user_id: str, task: Task): + # Partition the task queue by user ID. As the queue has a + # maximum concurrency of 1, this means that at most one + # task can run at once per user (but tasks from different + # users can run concurrently). + with SetEnqueueOptions(queue_partition_key=user_id): + queue.enqueue(process_task, task) +``` \ No newline at end of file From 38e8dfdcec4a2b4786c918bfdaa42e9fac4759cf Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 9 Oct 2025 12:37:53 -0700 Subject: [PATCH 3/7] nit --- docs/python/reference/queues.md | 4 ++-- docs/python/tutorials/queue-tutorial.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/python/reference/queues.md b/docs/python/reference/queues.md index e8e02f687..7e41daf5d 100644 --- a/docs/python/reference/queues.md +++ b/docs/python/reference/queues.md @@ -160,7 +160,7 @@ with SetEnqueueOptions(deduplication_id="my_dedup_id"): **Priority Example** ```python -queue = Queue("example_queue", priority_enabled=True) +queue = Queue("priority_queue", priority_enabled=True) with SetEnqueueOptions(priority=10): # All workflows are enqueued with priority set to 10 @@ -176,7 +176,7 @@ with SetEnqueueOptions(priority=1): **Partitioned Queue Example** ```python -queue = Queue("queue", partition_queue=True, concurrency=1) +queue = Queue("partitioned_queue", partition_queue=True, concurrency=1) @DBOS.workflow() def process_task(task: Task): diff --git a/docs/python/tutorials/queue-tutorial.md b/docs/python/tutorials/queue-tutorial.md index ae637d746..d3316aec3 100644 --- a/docs/python/tutorials/queue-tutorial.md +++ b/docs/python/tutorials/queue-tutorial.md @@ -213,7 +213,7 @@ Workflows without assigned priorities have the highest priority and are dequeued Example syntax: ```python -queue = Queue("example_queue", priority_enabled=True) +queue = Queue("priority_queue", priority_enabled=True) with SetEnqueueOptions(priority=10): # All workflows are enqueued with priority set to 10 @@ -239,7 +239,7 @@ You can do this with a partitioned queue with a maximum concurrency limit of 1 w **Example Syntax** ```python -queue = Queue("queue", partition_queue=True, concurrency=1) +queue = Queue("partitioned_queue", partition_queue=True, concurrency=1) @DBOS.workflow() def process_task(task: Task): From 3882854709ac3e0fe97af40c0c658406e43edca2 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 9 Oct 2025 12:38:39 -0700 Subject: [PATCH 4/7] nit --- docs/python/tutorials/queue-tutorial.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/python/tutorials/queue-tutorial.md b/docs/python/tutorials/queue-tutorial.md index d3316aec3..457dd9704 100644 --- a/docs/python/tutorials/queue-tutorial.md +++ b/docs/python/tutorials/queue-tutorial.md @@ -231,7 +231,7 @@ with SetEnqueueOptions(priority=1): You can **partition** queues to distribute work across dynamically created queue partitions. When you enqueue a workflow on a partitioned queue, you must supply a queue partition key. Partitioned queues dequeue workflows and apply flow control limits for individual partitions, not for the entire queue. -Essentially, you can think of each partition as a "virtual queue" you can create dynamically by enqueueing a workflow with a partition key. +Essentially, you can think of each partition as a "virtual queue" you dynamically create by enqueueing a workflow with a partition key. For example, suppose you want your users to each be able to run only one task at a time. You can do this with a partitioned queue with a maximum concurrency limit of 1 where the partition key is user ID. From f02456b19d95dbfaa62a6ecd338d72947f178379 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 9 Oct 2025 12:38:56 -0700 Subject: [PATCH 5/7] nit --- docs/python/tutorials/queue-tutorial.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/python/tutorials/queue-tutorial.md b/docs/python/tutorials/queue-tutorial.md index 457dd9704..444dad827 100644 --- a/docs/python/tutorials/queue-tutorial.md +++ b/docs/python/tutorials/queue-tutorial.md @@ -233,7 +233,7 @@ When you enqueue a workflow on a partitioned queue, you must supply a queue part Partitioned queues dequeue workflows and apply flow control limits for individual partitions, not for the entire queue. Essentially, you can think of each partition as a "virtual queue" you dynamically create by enqueueing a workflow with a partition key. -For example, suppose you want your users to each be able to run only one task at a time. +For example, suppose you want your users to each be able to run at most one task at a time. You can do this with a partitioned queue with a maximum concurrency limit of 1 where the partition key is user ID. **Example Syntax** From 704031aa0cfdad80e21a4de9443bed1566678a8b Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Fri, 10 Oct 2025 11:44:40 -0700 Subject: [PATCH 6/7] nit --- docs/python/tutorials/queue-tutorial.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/python/tutorials/queue-tutorial.md b/docs/python/tutorials/queue-tutorial.md index 444dad827..1584debd4 100644 --- a/docs/python/tutorials/queue-tutorial.md +++ b/docs/python/tutorials/queue-tutorial.md @@ -231,7 +231,7 @@ with SetEnqueueOptions(priority=1): You can **partition** queues to distribute work across dynamically created queue partitions. When you enqueue a workflow on a partitioned queue, you must supply a queue partition key. Partitioned queues dequeue workflows and apply flow control limits for individual partitions, not for the entire queue. -Essentially, you can think of each partition as a "virtual queue" you dynamically create by enqueueing a workflow with a partition key. +Essentially, you can think of each partition as a "subqueue" you dynamically create by enqueueing a workflow with a partition key. For example, suppose you want your users to each be able to run at most one task at a time. You can do this with a partitioned queue with a maximum concurrency limit of 1 where the partition key is user ID. From 51c496e180764f1ccf0a645453f858148bd3b526 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Fri, 10 Oct 2025 11:53:14 -0700 Subject: [PATCH 7/7] update --- docs/python/tutorials/queue-tutorial.md | 88 +++++++++---------- docs/typescript/reference/methods.md | 7 +- docs/typescript/reference/queues.md | 3 + docs/typescript/tutorials/queue-tutorial.md | 94 +++++++++++++-------- 4 files changed, 109 insertions(+), 83 deletions(-) diff --git a/docs/python/tutorials/queue-tutorial.md b/docs/python/tutorials/queue-tutorial.md index 1584debd4..7cc66d924 100644 --- a/docs/python/tutorials/queue-tutorial.md +++ b/docs/python/tutorials/queue-tutorial.md @@ -99,7 +99,6 @@ queue = Queue("example_queue", worker_concurrency=5) #### Global Concurrency - Global concurrency limits the total number of workflows from a queue that can run concurrently across all DBOS processes in your application. For example, this queue will have a maximum of 10 workflows running simultaneously across your entire application. @@ -114,22 +113,7 @@ from dbos import Queue queue = Queue("example_queue", concurrency=10) ``` - - - -### Rate Limiting - -You can set _rate limits_ for a queue, limiting the number of functions that it can start in a given period. -Rate limits are global across all DBOS processes using this queue. -For example, this queue has a limit of 50 with a period of 30 seconds, so it may not start more than 50 functions in 30 seconds: - -```python -queue = Queue("example_queue", limiter={"limit": 50, "period": 30}) -``` - -Rate limits are especially useful when working with a rate-limited API, such as many LLM APIs. - -### In-Order Processing +#### In-Order Processing You can use a queue with `concurrency=1` to guarantee sequential, in-order processing of events. Only a single event will be processed at a time. @@ -153,6 +137,18 @@ def event_endpoint(event: str): queue.enqueue(process_event, event) ``` +### Rate Limiting + +You can set _rate limits_ for a queue, limiting the number of functions that it can start in a given period. +Rate limits are global across all DBOS processes using this queue. +For example, this queue has a limit of 50 with a period of 30 seconds, so it may not start more than 50 functions in 30 seconds: + +```python +queue = Queue("example_queue", limiter={"limit": 50, "period": 30}) +``` + +Rate limits are especially useful when working with a rate-limited API, such as many LLM APIs. + ## Setting Timeouts @@ -177,6 +173,35 @@ with SetWorkflowTimeout(10): queue.enqueue(example_workflow) ``` +## Partitioning Queues + +You can **partition** queues to distribute work across dynamically created queue partitions. +When you enqueue a workflow on a partitioned queue, you must supply a queue partition key. +Partitioned queues dequeue workflows and apply flow control limits for individual partitions, not for the entire queue. +Essentially, you can think of each partition as a "subqueue" you dynamically create by enqueueing a workflow with a partition key. + +For example, suppose you want your users to each be able to run at most one task at a time. +You can do this with a partitioned queue with a maximum concurrency limit of 1 where the partition key is user ID. + +**Example Syntax** + +```python +queue = Queue("partitioned_queue", partition_queue=True, concurrency=1) + +@DBOS.workflow() +def process_task(task: Task): + ... + + +def on_user_task_submission(user_id: str, task: Task): + # Partition the task queue by user ID. As the queue has a + # maximum concurrency of 1, this means that at most one + # task can run at once per user (but tasks from different + # users can run concurrently). + with SetEnqueueOptions(queue_partition_key=user_id): + queue.enqueue(process_task, task) +``` + ## Deduplication You can set a deduplication ID for an enqueued workflow with [`SetEnqueueOptions`](../reference/contexts.md#setenqueueoptions). @@ -225,32 +250,3 @@ with SetEnqueueOptions(priority=10): with SetEnqueueOptions(priority=1): queue.enqueue(first_workflow) ``` - -## Partitioning Queues - -You can **partition** queues to distribute work across dynamically created queue partitions. -When you enqueue a workflow on a partitioned queue, you must supply a queue partition key. -Partitioned queues dequeue workflows and apply flow control limits for individual partitions, not for the entire queue. -Essentially, you can think of each partition as a "subqueue" you dynamically create by enqueueing a workflow with a partition key. - -For example, suppose you want your users to each be able to run at most one task at a time. -You can do this with a partitioned queue with a maximum concurrency limit of 1 where the partition key is user ID. - -**Example Syntax** - -```python -queue = Queue("partitioned_queue", partition_queue=True, concurrency=1) - -@DBOS.workflow() -def process_task(task: Task): - ... - - -def on_user_task_submission(user_id: str, task: Task): - # Partition the task queue by user ID. As the queue has a - # maximum concurrency of 1, this means that at most one - # task can run at once per user (but tasks from different - # users can run concurrently). - with SetEnqueueOptions(queue_partition_key=user_id): - queue.enqueue(process_task, task) -``` \ No newline at end of file diff --git a/docs/typescript/reference/methods.md b/docs/typescript/reference/methods.md index c132bfb2f..44cc5991b 100644 --- a/docs/typescript/reference/methods.md +++ b/docs/typescript/reference/methods.md @@ -25,6 +25,7 @@ interface StartWorkflowParams { export interface EnqueueOptions { deduplicationID?: string; priority?: number; + queuePartitionKey?: string; } ``` @@ -64,8 +65,10 @@ const handle = await DBOS.startWorkflow(Example).exampleWorkflow(input); - **workflowID**: An ID to assign to the workflow. If not specified, a random UUID is generated. - **queueName**: The name of the queue on which to enqueue this workflow, if any. - **timeoutMS**: The timeout of this workflow in milliseconds. -- **deduplicationID**: Optionally specified when enqueueing a workflow. At any given time, only one workflow with a specific deduplication ID can be enqueued in the specified queue. If a workflow with a deduplication ID is currently enqueued or actively executing (status `ENQUEUED` or `PENDING`), subsequent workflow enqueue attempt with the same deduplication ID in the same queue will raise a `DBOSQueueDuplicatedError` exception. -- **priority**: Optionally specified when enqueueing a workflow. The priority of the enqueued workflow in the specified queue. Workflows with the same priority are dequeued in **FIFO (first in, first out)** order. Priority values can range from `1` to `2,147,483,647`, where **a low number indicates a higher priority**. Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities. +- **enqueueOptions**: + - **deduplicationID**: At any given time, only one workflow with a specific deduplication ID can be enqueued in the specified queue. If a workflow with a deduplication ID is currently enqueued or actively executing (status `ENQUEUED` or `PENDING`), subsequent workflow enqueue attempt with the same deduplication ID in the same queue will raise a `DBOSQueueDuplicatedError` exception. + - **priority**: The priority of the enqueued workflow in the specified queue. Workflows with the same priority are dequeued in **FIFO (first in, first out)** order. Priority values can range from `1` to `2,147,483,647`, where **a low number indicates a higher priority**. Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities. + - **queuePartitionKey**: The queue partition in which to enqueue this workflow. Use if and only if the queue is partitioned (`partitionQueue: true`). In partitioned queues, all flow control (including concurrency and rate limits) is applied to individual partitions instead of the queue as a whole. ### DBOS.send diff --git a/docs/typescript/reference/queues.md b/docs/typescript/reference/queues.md index c346e4745..2f4598ecb 100644 --- a/docs/typescript/reference/queues.md +++ b/docs/typescript/reference/queues.md @@ -20,6 +20,7 @@ interface QueueParameters { concurrency?: number; rateLimit?: QueueRateLimit; priorityEnabled?: boolean; + partitionQueue?: boolean; } interface QueueRateLimit { @@ -37,6 +38,8 @@ This concurrency limit is global across all DBOS processes using this queue. - **rateLimit.limitPerPeriod**: The number of workflows that may be started within the specified time period. - **rateLimit.periodSec**: The time period across which `limitPerPeriod` applies. - **priorityEnabled**: Enable setting priority for workflows on this queue. +- **partitionQueue**: Enable partitioning for this queue. + **Example syntax:** diff --git a/docs/typescript/tutorials/queue-tutorial.md b/docs/typescript/tutorials/queue-tutorial.md index 98eda0dbe..6516b006b 100644 --- a/docs/typescript/tutorials/queue-tutorial.md +++ b/docs/typescript/tutorials/queue-tutorial.md @@ -126,6 +126,41 @@ import { DBOS, WorkflowQueue } from "@dbos-inc/dbos-sdk"; const queue = new WorkflowQueue("example_queue", { concurrency: 10 }); ``` +#### In-Order Processing + +You can use a queue with `concurrency=1` to guarantee sequential, in-order processing of events. +Only a single event will be processed at a time. +For example, this app processes events sequentially in the order of their arrival: + +```javascript +import { DBOS, WorkflowQueue } from "@dbos-inc/dbos-sdk"; +import express from "express"; + +const serialQueue = new WorkflowQueue("in_order_queue", { concurrency: 1 }); +const app = express(); + +class Tasks { + @DBOS.workflow() + static async processTask(task){ + // ... process task + } +} + +app.get("/events/:event", async (req, res) => { + await DBOS.startWorkflow(Tasks, {queueName: serialQueue.name}).processTask(req.params); + await res.send("Workflow Started!"); +}); + +// Launch DBOS and start the server +async function main() { + await DBOS.launch(); + app.listen(3000, () => {}); +} + +main().catch(console.log); +``` + + ### Rate Limiting You can set _rate limits_ for a queue, limiting the number of functions that it can start in a given period. @@ -164,6 +199,30 @@ async function main() { } ``` +### Partitioning Queues + +You can **partition** queues to distribute work across dynamically created queue partitions. +When you enqueue a workflow on a partitioned queue, you must supply a queue partition key. +Partitioned queues dequeue workflows and apply flow control limits for individual partitions, not for the entire queue. +Essentially, you can think of each partition as a "subqueue" you dynamically create by enqueueing a workflow with a partition key. + +For example, suppose you want your users to each be able to run at most one task at a time. +You can do this with a partitioned queue with a maximum concurrency limit of 1 where the partition key is user ID. + +**Example Syntax** + +```ts +const queue = new WorkflowQueue("example_queue", { partitionQueue: true, concurrency: 1 }); + +async function onUserTaskSubmission(userID: string, task: Task) { + // Partition the task queue by user ID. As the queue has a + // maximum concurrency of 1, this means that at most one + // task can run at once per user (but tasks from different + // users can run concurrently). + await DBOS.startWorkflow(taskWorkflow, {queueName: queue.name, enqueueOptions: {queuePartitionKey: userID}})(task); +} +``` + ### Deduplication You can set a deduplication ID for an enqueued workflow as an argument to `DBOS.startWorkflow`. @@ -219,38 +278,3 @@ async function main() { const handle = await DBOS.startWorkflow(taskWorkflow, {queueName: queue.name, enqueueOptions: {priority: priority}})(task); } ``` - -### In-Order Processing - -You can use a queue with `concurrency=1` to guarantee sequential, in-order processing of events. -Only a single event will be processed at a time. -For example, this app processes events sequentially in the order of their arrival: - -```javascript -import { DBOS, WorkflowQueue } from "@dbos-inc/dbos-sdk"; -import express from "express"; - -const serialQueue = new WorkflowQueue("in_order_queue", { concurrency: 1 }); -const app = express(); - -class Tasks { - @DBOS.workflow() - static async processTask(task){ - // ... process task - } -} - -app.get("/events/:event", async (req, res) => { - await DBOS.startWorkflow(Tasks, {queueName: serialQueue.name}).processTask(req.params); - await res.send("Workflow Started!"); -}); - -// Launch DBOS and start the server -async function main() { - await DBOS.launch(); - app.listen(3000, () => {}); -} - -main().catch(console.log); -``` -