pikku-queue

$npx mdskill add pikkujs/pikku/pikku-queue

Manage background jobs with type-safe queues and progress tracking.

  • Enables async task processing, job retries, and distributed workers.
  • Integrates BullMQ and PgBoss adapters for reliable queue execution.
  • Decides execution based on job configuration and worker function types.
  • Delivers real-time progress updates and job status to the application.

SKILL.md

.github/skills/pikku-queueView on GitHub ↗
---
name: pikku-queue
description: 'Use when adding background job processing, async task queues, or distributed workers to a Pikku app. Covers wireQueueWorker, job enqueuing, progress tracking, retries, BullMQ and PgBoss adapters.
TRIGGER when: code uses wireQueueWorker, user asks about background jobs, task queues, async processing, BullMQ, PgBoss, or job retries.
DO NOT TRIGGER when: user asks about scheduled cron tasks (use pikku-cron) or event-driven triggers (use pikku-trigger).'
---

# Pikku Queue Wiring

Wire Pikku functions as background queue workers. Supports job control (progress, retry, discard), configurable concurrency, and type-safe job publishing.

## Before You Start

```bash
pikku info functions --verbose   # See existing functions and their types
pikku info tags --verbose        # Understand project organization
```

See `pikku-concepts` for the core mental model.

## API Reference

### `wireQueueWorker(config)`

```typescript
import { wireQueueWorker } from '@pikku/core/queue'

wireQueueWorker({
  name: string,            // Queue name (unique identifier)
  func: PikkuFunc,         // Worker function
  config?: {
    batchSize?: number,    // Process N jobs at once
    removeOnComplete?: number | boolean,  // Clean up completed jobs
  },
})
```

### Wire Object (`wire.queue`)

Inside queue worker functions:

```typescript
wire.queue.updateProgress(percent: number)  // Report progress (0-100)
wire.queue.discard(reason: string)          // Silently discard job
wire.queue.fail(reason: string)             // Mark job as failed
```

### Job Publishing

```typescript
const jobId = await queue.add(queueName, data, options?)
```

Options:

```typescript
{
  priority?: number,       // Higher = processed first
  delay?: number,          // Delay in ms before processing
  attempts?: number,       // Max retry attempts
  backoff?: {
    type: 'exponential' | 'fixed',
    delay: number,         // Base delay in ms
  },
}
```

## Usage Patterns

### Basic Queue Worker

```typescript
const processReminder = pikkuSessionlessFunc({
  title: 'Process Reminder',
  func: async ({ db, emailService }, { todoId, userId }) => {
    const todo = await db.getTodo(todoId)
    await emailService.sendReminder(userId, todo)
    return { sent: true }
  },
})

wireQueueWorker({
  name: 'todo-reminders',
  func: processReminder,
})
```

### Job Control (Progress, Discard, Fail)

```typescript
const processReminder = pikkuSessionlessFunc({
  title: 'Process Reminder',
  func: async ({ db }, { todoId }, wire) => {
    await wire.queue.updateProgress(25)

    const todo = await db.getTodo(todoId)
    if (!todo) {
      await wire.queue.discard('Todo not found')
      return
    }

    if (todo.completed) {
      await wire.queue.fail('Todo already completed')
      return
    }

    await wire.queue.updateProgress(100)
    return { sent: true }
  },
})
```

### Retries & Configuration

```typescript
wireQueueWorker({
  name: 'todo-reminders',
  func: processReminder,
  config: {
    batchSize: 5,
    removeOnComplete: 100,
  },
})

// Enqueue with retry options
const jobId = await queue.add(
  'todo-reminders',
  {
    todoId: 'abc-123',
    userId: 'user-456',
  },
  {
    priority: 10,
    delay: 5000,
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
  }
)
```

### Type-Safe Queue Publishing

After `npx pikku prebuild`:

```typescript
import { PikkuQueue } from '.pikku/pikku-queue.gen.js'

const queue = new PikkuQueue(queueService)

const jobId = await queue.add('todo-reminders', {
  todoId: 'abc-123',
  userId: 'user-456',
})

const job = await queue.getJob('todo-reminders', jobId)
const status = await job.status() // 'waiting' | 'active' | 'completed' | 'failed'
const result = await job.waitForCompletion(30_000)
```

### Queue Adapters

**BullMQ** (Redis-based):

```typescript
import { BullMQQueueService } from '@pikku/queue-bullmq'

const queueService = new BullMQQueueService({
  connection: { host: 'localhost', port: 6379 },
})
```

**PgBoss** (PostgreSQL-based):

```typescript
import { PgBossQueueService } from '@pikku/queue-pg-boss'

const queueService = new PgBossQueueService({
  connectionString: 'postgres://...',
})
```

## Complete Example

```typescript
// functions/email.functions.ts
export const sendWelcomeEmail = pikkuSessionlessFunc({
  title: 'Send Welcome Email',
  func: async ({ emailService, db }, { userId }, wire) => {
    await wire.queue.updateProgress(10)

    const user = await db.getUser(userId)
    if (!user) {
      await wire.queue.discard('User not found')
      return
    }

    await wire.queue.updateProgress(50)
    await emailService.send({
      to: user.email,
      subject: 'Welcome!',
      template: 'welcome',
      data: { name: user.name },
    })

    await wire.queue.updateProgress(100)
    return { sent: true, email: user.email }
  },
})

// wirings/queue.wiring.ts
wireQueueWorker({
  name: 'welcome-emails',
  func: sendWelcomeEmail,
  config: { removeOnComplete: 100 },
})

// Enqueue from another function
export const registerUser = pikkuSessionlessFunc({
  title: 'Register User',
  func: async ({ db, queue }, { email, name }) => {
    const user = await db.createUser({ email, name })
    await queue.add('welcome-emails', { userId: user.id })
    return { user }
  },
})
```

More from pikkujs/pikku

SkillDescription
pikku-addon'Use when creating or consuming reusable function packages (addons) in Pikku. Covers wireAddon, addon(), pikkuAddonServices, pikkuAddonWireServices, addon package structure, and cross-project function sharing.
pikku-ai-agent'Use when building AI agents, chatbots, or LLM-powered assistants with Pikku. Covers pikkuAIAgent, tool registration, memory, streaming, and agent invocation.
pikku-ai-vercel'Use when setting up AI agent execution with the Vercel AI SDK in a Pikku app. Covers VercelAIAgentRunner for streaming and non-streaming AI agent steps.
pikku-ai-voice'Use when adding voice input (speech-to-text) or voice output (text-to-speech) to AI agents in a Pikku app. Covers voiceInput/voiceOutput middleware hooks and STT/TTS service interfaces.
pikku-auth-js'Use when integrating Auth.js (NextAuth) with a Pikku app. Covers createAuthHandler, createAuthRoutes, and Auth.js configuration.
pikku-aws'Use when setting up AWS services (S3, SQS, Secrets Manager) in a Pikku app. Covers S3Content for file storage, SQSQueueService for queues, and AWSSecrets for secret management.
pikku-backblaze'Use when setting up Backblaze B2 file storage in a Pikku app. Covers B2Content for file uploads, downloads, and signed URLs.
pikku-cli'Use when building CLI commands with Pikku. Covers wireCLI, pikkuCLICommand, subcommands, options, parameters, custom renderers, and nested command groups.
pikku-concepts'Foundational guide to Pikku framework concepts. Use this skill when working with any Pikku codebase, starting a new Pikku project, or migrating a backend to Pikku. Covers the core mental model, function types, project structure, code generation, testing, and how Pikku maps to traditional backend patterns.
pikku-config'Use when managing secrets, environment variables, config, or OAuth2 credentials in a Pikku app. Covers wireSecret, wireVariable, wireOAuth2Credential, and typed config access.