NextMQ

Examples

Complete Next.js patterns you can lift into an app: enqueue from a route handler or server action, then process over the webhook. Retries, schedulers, rate limits, deduplication, flows, short waits, and permanent failures.

Every NextMQ feature uses the same three pieces: a shared job module that defines the queue and worker, the webhook route from the Quickstart that runs your processors, and your app code — a route handler or server action — that enqueues work. The first example shows all three; the rest focus on the job module, since the route stays the same.

End-to-end: welcome email with retries#

The job module defines the queue, the worker, and a typed enqueue helper. Use attempts and exponential backoff for transient email providers, and keep the processor idempotent with job.id if the send can create a side effect.

jobs/emails.ts
import { Queue, Worker } from '@nextmq/sdk'

type WelcomeEmail = { userId: string; email: string }
type EmailResult = { messageId: string }

export const emailQueue = new Queue<WelcomeEmail, EmailResult>('emails')

export const emailWorker = new Worker<WelcomeEmail, EmailResult>('emails', async (job) => {
  await job.log(`Sending welcome email to ${job.data.email}`)
  const messageId = await sendWelcomeEmail(job.data.email)
  return { messageId }
})

export async function enqueueWelcomeEmail(input: WelcomeEmail) {
  return emailQueue.add('welcome', input, {
    attempts: 5,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: { count: 1000, age: 24 * 60 * 60 },
    removeOnFail: { count: 5000, age: 7 * 24 * 60 * 60 },
  })
}

Mount the worker once in the catch-all webhook route. NextMQ calls this route to run the processor — it must use the Node.js runtime.

app/api/nextmq/[...path]/route.ts
import { createNextMQHandler } from '@nextmq/sdk/next'
import { emailWorker } from '@/jobs/emails'

export const runtime = 'nodejs'

export const { GET, POST } = createNextMQHandler({ workers: [emailWorker] })

Now enqueue from anywhere in your app. From a route handler after a signup:

app/api/signup/route.ts
import { enqueueWelcomeEmail } from '@/jobs/emails'

export async function POST(req: Request) {
  const { userId, email } = await req.json()

  await createUser({ userId, email })
  await enqueueWelcomeEmail({ userId, email }) // returns immediately

  return Response.json({ ok: true })
}

Or from a server action, e.g. a form submit:

app/actions.ts
'use server'

import { enqueueWelcomeEmail } from '@/jobs/emails'

export async function signUp(formData: FormData) {
  const email = String(formData.get('email'))
  const user = await createUser({ email })

  await enqueueWelcomeEmail({ userId: user.id, email })
}

Image thumbnail generation#

Good for work that should not block the upload request. Track progress so the dashboard and API readers can show useful state.

jobs/images.ts
import { Queue, Worker } from '@nextmq/sdk'

type ThumbnailJob = { uploadId: string; sourceUrl: string }
type ThumbnailResult = { thumbnailUrl: string }

export const imageQueue = new Queue<ThumbnailJob, ThumbnailResult>('images')

export const imageWorker = new Worker<ThumbnailJob, ThumbnailResult>(
  'images',
  async (job) => {
    await job.updateProgress(10)
    const file = await downloadImage(job.data.sourceUrl)

    await job.updateProgress(60)
    const thumbnailUrl = await createThumbnail(file, job.data.uploadId)

    await job.updateProgress(100)
    return { thumbnailUrl }
  },
  { concurrency: 5, timeoutMs: 120_000 },
)

Daily report scheduler#

Use a job scheduler when the queue should mint a job on a cron or interval. Create schedulers in startup code so a fresh deploy already has them.

instrumentation.ts
export async function register() {
  if (process.env.NEXT_RUNTIME !== 'nodejs') return

  const { ensureWorkersRegistered } = await import('@nextmq/sdk/next')
  const { reportQueue, reportWorker } = await import('@/jobs/reports')

  await ensureWorkersRegistered([reportWorker])
  await reportQueue.upsertJobScheduler(
    'daily-report',
    { pattern: '0 9 * * *', tz: 'Europe/Budapest' },
    {
      name: 'build-daily-report',
      data: { range: 'yesterday' },
      opts: { attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
    },
  )
}

Protect a provider quota#

Use a worker limiter for steady throughput, and throw RateLimitError when the provider tells you exactly when to retry. That requeues the job without burning an attempt.

jobs/llm.ts
import { Queue, RateLimitError, Worker } from '@nextmq/sdk'

export const promptQueue = new Queue<{ prompt: string }, { text: string }>('prompts')

export const promptWorker = new Worker(
  'prompts',
  async (job) => {
    const res = await callModel(job.data.prompt)

    if (res.status === 429) {
      throw new RateLimitError(res.retryAfterMs ?? 1000)
    }

    if (!res.ok) throw new Error(res.error)
    return { text: res.text }
  },
  {
    concurrency: 10,
    limiter: { max: 60, duration: 60_000 },
  },
)

Deduplicate double submits#

Use jobId when your business object has one canonical job. Usededuplication when you want a time-window or replaceable dedup key.

PatternUse when
jobIdOne order/export/import should have one job forever.
deduplication.idRepeated clicks or webhooks should collapse for a TTL window.
await reportQueue.add(
  'export-report',
  { reportId },
  {
    jobId: `report-${reportId}`,
    attempts: 3,
  },
)

await emailQueue.add(
  'send-confirmation',
  { userId },
  {
    deduplication: { id: `confirmation-${userId}`, ttl: 60_000 },
  },
)
Heads up
Custom jobId values cannot contain : and cannot be integer strings. See SDK reference.

Fan out and aggregate results#

A flow parent runs after every child finishes. Read child return values in the parent withjob.getChildrenValues().

jobs/video.ts
import { FlowProducer, Queue, Worker } from '@nextmq/sdk'

export const flow = new FlowProducer()
export const renderQueue = new Queue('video-render')
export const notificationQueue = new Queue('video-notify')

export async function enqueueVideoPipeline(videoId: string) {
  return flow.add({
    name: 'notify',
    queueName: 'video-notify',
    data: { videoId },
    children: [
      { name: 'transcode-1080p', queueName: 'video-render', data: { videoId, res: '1080p' } },
      { name: 'transcode-720p', queueName: 'video-render', data: { videoId, res: '720p' } },
      { name: 'thumbnail', queueName: 'video-render', data: { videoId, kind: 'thumb' } },
    ],
  })
}

export const notifyWorker = new Worker('video-notify', async (job) => {
  const children = await job.getChildrenValues()
  await sendReadyEmail(job.data, children)
  return { notified: true, childCount: Object.keys(children).length }
})

Permanent validation failure#

Throw UnrecoverableJobError for inputs that will never succeed. It fails the job immediately and skips remaining attempts.

import { UnrecoverableJobError, Worker } from '@nextmq/sdk'

export const importWorker = new Worker('imports', async (job) => {
  if (!job.data.fileUrl) {
    throw new UnrecoverableJobError('Missing fileUrl')
  }

  return runImport(job.data.fileUrl)
})

Wait for a short result#

It is fine to wait from a route handler when the work is short and the caller really needs the result. For long jobs, return the job id and let the client poll your app.

app/api/reports/preview/route.ts
import { reportQueue } from '@/jobs/reports'

export async function POST(req: Request) {
  const data = await req.json()
  const job = await reportQueue.add('preview-report', data, {
    attempts: 2,
    removeOnComplete: { count: 100, age: 3600 },
  })

  const result = await job.waitUntilFinished({
    timeoutMs: 15_000,
    pollIntervalMs: 500,
  })

  return Response.json(result)
}
Note
waitUntilFinished() polls over HTTP. If you use immediate removal, wait before the short result snapshot expires or you may see JobRemovedError.

Run code-free scheduled POSTs#

If the work is just "POST this JSON to that URL on a schedule", use Custom Triggers from the dashboard instead of writing SDK code.