Skip to content

DevJoghurt/nuxt-queue

Repository files navigation

Nvent

Event-sourced queue and flow orchestration for Nuxt. Zero dependencies to get started - built-in memory/file adapters included. Scale to Redis when ready.

โœจ Features

  • ๐Ÿš€ Zero Setup: Start instantly with built-in memory/file adapters - no Redis required
  • ๐Ÿ”„ Queue Management: Reliable job processing with pluggable queue adapters
  • ๐ŸŽญ Flow Orchestration: Multi-step workflows with event sourcing
  • โฐ Flow Scheduling: Cron-based and delayed flow execution
  • ๐Ÿ”Œ Pluggable Adapters: Built-in memory/file support, optional Redis adapters for production
  • ๐Ÿ“Š Event Sourcing: Complete audit trail of all flow operations
  • ๐ŸŽจ Development UI: Visual flow diagrams, timeline, and scheduling (separate package)
  • ๐Ÿ”Œ Function Context: Rich runtime with state, logging, and events
  • ๐Ÿ“ฆ Auto-discovery: Filesystem-based worker registry
  • ๐Ÿš€ Horizontal Scaling: Stateless architecture with Redis adapters
  • ๐Ÿ” Full Observability: Real-time logs, metrics, and event streams
  • ๐Ÿ›‘ Flow Control: Cancel running flows, detect stalled flows, query flow status

Version: v0.4.5
Status: โœ… Current Implementation
Last Updated: 2025-11-18 โœ… Core queue and flow functionality
โœ… Built-in memory/file adapters - no Redis required to start
โœ… Optional Redis adapters for production scaling
โœ… Event sourcing with stream adapters
โœ… Real-time monitoring UI (separate @nvent-addon/app package)
โœ… Flow scheduling (cron patterns and delays)
โœ… Flow control (cancel, query running flows, stall detection)
โœ… Worker context with state, logging, and events
โœ… Auto-discovery and flow analysis
๐Ÿšง Comprehensive trigger system (planned v0.5)
๐Ÿšง Python functions (planned v0.5)
๐Ÿšง Postgres adapters (planned v0.6)

๐Ÿ—ƒ๏ธ Event Schema & Storage

All flow operations are event-sourced and stored in streams (nq:flow:<runId>). Events are immutable, type-safe, and provide a complete audit trail.

Event types:

  • flow.start, flow.completed, flow.failed, flow.cancel, flow.stalled
  • step.started, step.completed, step.failed, step.retry
  • log, emit, state

Storage Options:

  • Built-in: Memory (development), File (persistence without database)
  • Production: Redis Streams with @nvent-addon/adapter-stream-redis

See Event Schema for full details and field definitions.

๐Ÿ† Best Practices

  • Keep steps small and focused
  • Use state for shared data between steps
  • Use ctx.flow.emit() to trigger downstream steps
  • Log with context using ctx.logger.log()
  • Set concurrency based on resource needs
  • Use on-complete state cleanup for automatic state management
  • Document schedules with metadata for maintainability

โš ๏ธ Limitations (v0.4.5)

  1. TypeScript only: Python functions not yet implemented (planned for v0.5)
  2. No complex triggers: Only basic scheduling available (v0.5 will add triggers)
  3. No await patterns: Pausing flows for time/events planned for v0.5
  4. No Postgres adapters: Only memory/file/Redis adapters available (Postgres planned for v0.6)
  5. State separate from events: Not unified with stream store (planned for v0.6)
  6. Basic logging: No advanced logger adapters (planned for v0.7)
  7. No schedule editing: Must delete and recreate schedules (v0.5 will add full trigger management)
  8. File adapter limitations: Single instance only, not suitable for horizontal scaling

๐Ÿš€ Quick Start

Installation

Core package (zero dependencies to start):

npm install nvent

Optional UI package:

npm install @nvent-addon/app

Optional Redis adapters for production:

npm install @nvent-addon/adapter-queue-redis
npm install @nvent-addon/adapter-store-redis
npm install @nvent-addon/adapter-stream-redis

Configuration

Minimal setup (uses built-in memory adapters):

// nuxt.config.ts
export default defineNuxtConfig({
  modules: ['nvent'],
})

With persistence (file adapters):

// nuxt.config.ts
export default defineNuxtConfig({
  modules: ['nvent'],
  nvent: {
    queue: {
      adapter: 'file',
      dataDir: './.data/queue'
    },
    store: {
      adapter: 'file',
      dataDir: './.data/store'
    },
    stream: {
      adapter: 'file',
      dataDir: './.data/stream'
    }
  }
})

Production setup (Redis adapters):

// nuxt.config.ts
export default defineNuxtConfig({
  modules: [
    'nvent',
    '@nvent-addon/adapter-queue-redis',
    '@nvent-addon/adapter-store-redis',
    '@nvent-addon/adapter-stream-redis',
    '@nvent-addon/app'  // Optional UI
  ],
  nvent: {
    // Shared Redis connection
    connections: {
      redis: {
        host: '127.0.0.1',
        port: 6379
      }
    },
    // Configure adapters
    queue: {
      adapter: 'redis',
      defaultConfig: { concurrency: 2 }
    },
    store: {
      adapter: 'redis'
    },
    stream: {
      adapter: 'redis'
    },
    // Flow configuration
    flows: {
      stallDetection: {
        enabled: true,
        timeout: 300000  // 5 minutes
      }
    }
  }
})

Create Your First Function

// server/functions/example/process.ts
export default defineFunction(async (job, ctx) => {
  // Access job data
  const { message } = job.data
  
  // Log to stream
  ctx.logger.log('info', 'Processing message', { message })
  
  // Store state
  await ctx.state.set('processedAt', new Date().toISOString())
  
  // Return result
  return { success: true, processed: message }
})

export const config = defineFunctionConfig({
  concurrency: 5,
})

Enqueue a Job

// API route or wherever
const queue = useQueueAdapter()
await queue.enqueue('process', {
  name: 'process',
  data: { message: 'Hello World' }
})

Create a Flow

Multi-step workflows with event-driven orchestration:

// server/functions/my-flow/start.ts
export default defineFunction(async (job, ctx) => {
  ctx.logger.log('info', 'Flow started')
  const prepared = { step: 1, data: job.data }
  
  // Emit event to trigger next steps
  ctx.flow.emit('data.prepared', prepared)
  
  return prepared
})

export const config = defineFunctionConfig({
  flow: {
    name: ['my-flow'],
    role: 'entry',
    step: 'start',
    emits: ['data.prepared']
  }
})

// server/functions/my-flow/process.ts
export default defineFunction(async (job, ctx) => {
  const result = await processData(job.data)
  
  // Emit to trigger next step
  ctx.flow.emit('data.processed', result)
  
  return result
})

export const config = defineFunctionConfig({
  flow: {
    name: ['my-flow'],
    role: 'step',
    step: 'process',
    subscribes: ['data.prepared'],  // Triggered by start
    emits: ['data.processed']
  }
})

// server/functions/my-flow/validate.ts
export default defineFunction(async (job, ctx) => {
  const validated = await validate(job.data)
  ctx.flow.emit('validation.complete', validated)
  return validated
})

export const config = defineFunctionConfig({
  flow: {
    name: ['my-flow'],
    role: 'step',
    step: 'validate',
    subscribes: ['data.prepared'],  // Also triggered by start (parallel with process)
    emits: ['validation.complete']
  }
})

Start the flow:

const { startFlow } = useFlowEngine()
await startFlow('my-flow', { input: 'data' })

Check flow status:

const { isRunning, getRunningFlows, cancelFlow } = useFlowEngine()

// Check if specific run is still active
const running = await isRunning('my-flow', runId)

// Get all running instances of a flow
const runs = await getRunningFlows('my-flow')

// Cancel a running flow
await cancelFlow('my-flow', runId)

Flow execution: Entry step emits data.prepared โ†’ Both process and validate steps run in parallel (they both subscribe to data.prepared) โ†’ Each emits its own completion event for downstream steps.

Schedule a Flow

Schedule flows to run automatically with cron patterns or delays:

// Schedule a flow to run daily at 2 AM
await $fetch('/api/_flows/my-flow/schedule', {
  method: 'POST',
  body: {
    cron: '0 2 * * *',
    input: { retentionDays: 30 },
    metadata: {
      description: 'Daily cleanup job'
    }
  }
})

// Schedule a one-time delayed execution (5 minutes)
await $fetch('/api/_flows/reminder-flow/schedule', {
  method: 'POST',
  body: {
    delay: 300000,  // milliseconds
    input: { userId: '123', message: 'Check your email' }
  }
})

// List all schedules for a flow
const schedules = await $fetch('/api/_flows/my-flow/schedules')

// Delete a schedule
await $fetch('/api/_flows/my-flow/schedules/schedule-id', {
  method: 'DELETE'
})

Common cron patterns:

  • * * * * * - Every minute
  • */5 * * * * - Every 5 minutes
  • 0 * * * * - Every hour
  • 0 2 * * * - Daily at 2 AM
  • 0 9 * * 1 - Every Monday at 9 AM
  • 0 0 1 * * - First day of month at midnight

๐ŸŽจ Development UI

Install the UI package:

npm install @nvent-addon/app

Add to your Nuxt modules:

export default defineNuxtConfig({
  modules: ['nvent', '@nvent-addon/app']
})

Access the built-in UI as <NventApp /> component:

  • ๐Ÿ“Š Dashboard: Overview of queues and flows
  • ๐Ÿ”„ Flow Diagrams: Visual representation with Vue Flow
  • โฐ Flow Scheduling: Create and manage cron-based or delayed schedules
  • ๐Ÿ“ Event Timeline: Real-time event stream with step details
  • ๐Ÿ“‹ Logs: Filtered logging by flow/step
  • ๐Ÿ“ˆ Metrics: Queue statistics and performance
  • ๐Ÿ” Flow Runs: Complete history with status tracking (running, completed, failed, canceled, stalled)

๐Ÿ—๏ธ Architecture

Pluggable Adapters

Nvent uses a three-tier adapter system:

  1. Queue Adapter: Job processing and scheduling

    • Built-in: memory, file
    • Redis: @nvent-addon/adapter-queue-redis (BullMQ)
  2. Store Adapter: Document and key-value storage

    • Built-in: memory, file
    • Redis: @nvent-addon/adapter-store-redis
  3. Stream Adapter: Event sourcing and real-time distribution

    • Built-in: memory, file
    • Redis: @nvent-addon/adapter-stream-redis (Redis Streams + Pub/Sub)

Event Sourcing

Every flow operation is stored as an event in streams:

nq:flow:<runId>
โ”œโ”€ flow.start
โ”œโ”€ step.started
โ”œโ”€ log
โ”œโ”€ step.completed
โ”œโ”€ step.started
โ”œโ”€ log
โ”œโ”€ step.completed
โ””โ”€ flow.completed

Terminal states: flow.completed, flow.failed, flow.cancel, flow.stalled

Real-time Distribution

With Redis stream adapter, events are broadcast via Pub/Sub for instant UI updates (<100ms latency).

Function Context

Every function receives a rich context:

{
  jobId: string              // BullMQ job ID
  queue: string              // Queue name
  flowId: string             // Flow run UUID
  flowName: string           // Flow definition name
  stepName: string           // Current step name
  logger: {
    log(level, msg, meta)    // Structured logging
  },
  state: {
    get(key)                 // Get flow-scoped state
    set(key, value, opts)    // Set with optional TTL
    delete(key)              // Delete state
  },
  flow: {
    emit(eventName, data)    // Emit flow event to trigger subscribed steps
    startFlow(name, input)   // Start nested flow
    cancelFlow(name, runId)  // Cancel a running flow
    isRunning(name, runId?)  // Check if flow is running
    getRunningFlows(name)    // Get all running instances
  }
}

๐Ÿ“š Documentation

v0.4 Documentation

API & Advanced

Roadmap & Future

๐Ÿ”ฎ Roadmap

v0.4.5 (Current - November 2025)

โœ… Core queue and flow orchestration
โœ… Built-in memory/file adapters - zero setup required
โœ… Optional Redis adapters for production scaling
โœ… Modular package structure (core + addons)
โœ… Event sourcing with pluggable stream adapters
โœ… Real-time monitoring UI (separate @nvent-addon/app package)
โœ… Flow scheduling (cron and delays)
โœ… Flow control (cancel, query status, stall detection)
โœ… Function context with state and logging
โœ… Improved configuration structure

v0.5

  • ๐ŸŽฏ Comprehensive trigger system (schedule, webhook, event, manual)
  • โฑ๏ธ Await patterns (time, event, condition)
  • ๐Ÿ Python function support with RPC bridge
  • ๐Ÿ”— Webhook triggers with auto-setup

v0.6

  • ๐Ÿ˜ PgBoss queue provider option
  • ๐Ÿ—„๏ธ Postgres stream store adapter
  • ๐Ÿ”„ Unified state and event storage
  • ๐Ÿ“Š Advanced state management

v0.7

  • ๐Ÿ“Š Enhanced logger with multiple adapters
  • ๐ŸŒ HTTP mode for functions (REST/gRPC)
  • ๐Ÿ”Œ External service hooks
  • ๐ŸŽจ Pluggable function execution modes

See specs/roadmap.md for complete details.

๐Ÿค Contributing

Contributions welcome! Please read our architecture docs first:

  1. Review specs/v0.4/current-implementation.md
  2. Check specs/roadmap.md for planned features
  3. Open an issue to discuss changes
  4. Submit a PR with tests

Development Setup

# Install dependencies
yarn install

# Start playground with dev UI
cd playground
yarn dev

# Run tests
yarn test

๐Ÿ“„ License

MIT License - Copyright (c) DevJoghurt

About

Nuxt module that adds support for queue managing and build process for worker based on BullMQ

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages