Examples
Complete Next.js patterns you can lift into an app: enqueue from a route handler or server action, then process over the webhook. Retries, schedulers, rate limits, deduplication, flows, short waits, and permanent failures.
Every NextMQ feature uses the same three pieces: a shared job module that defines the queue and worker, the webhook route from the Quickstart that runs your processors, and your app code — a route handler or server action — that enqueues work. The first example shows all three; the rest focus on the job module, since the route stays the same.
End-to-end: welcome email with retries#
The job module defines the queue, the worker, and a typed enqueue helper. Use attempts and exponential backoff for transient email providers, and keep the processor idempotent with job.id if the send can create a side effect.
import { Queue, Worker } from '@nextmq/sdk'
type WelcomeEmail = { userId: string; email: string }
type EmailResult = { messageId: string }
export const emailQueue = new Queue<WelcomeEmail, EmailResult>('emails')
export const emailWorker = new Worker<WelcomeEmail, EmailResult>('emails', async (job) => {
await job.log(`Sending welcome email to ${job.data.email}`)
const messageId = await sendWelcomeEmail(job.data.email)
return { messageId }
})
export async function enqueueWelcomeEmail(input: WelcomeEmail) {
return emailQueue.add('welcome', input, {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: { count: 1000, age: 24 * 60 * 60 },
removeOnFail: { count: 5000, age: 7 * 24 * 60 * 60 },
})
}Mount the worker once in the catch-all webhook route. NextMQ calls this route to run the processor — it must use the Node.js runtime.
import { createNextMQHandler } from '@nextmq/sdk/next'
import { emailWorker } from '@/jobs/emails'
export const runtime = 'nodejs'
export const { GET, POST } = createNextMQHandler({ workers: [emailWorker] })Now enqueue from anywhere in your app. From a route handler after a signup:
import { enqueueWelcomeEmail } from '@/jobs/emails'
export async function POST(req: Request) {
const { userId, email } = await req.json()
await createUser({ userId, email })
await enqueueWelcomeEmail({ userId, email }) // returns immediately
return Response.json({ ok: true })
}Or from a server action, e.g. a form submit:
'use server'
import { enqueueWelcomeEmail } from '@/jobs/emails'
export async function signUp(formData: FormData) {
const email = String(formData.get('email'))
const user = await createUser({ email })
await enqueueWelcomeEmail({ userId: user.id, email })
}Image thumbnail generation#
Good for work that should not block the upload request. Track progress so the dashboard and API readers can show useful state.
import { Queue, Worker } from '@nextmq/sdk'
type ThumbnailJob = { uploadId: string; sourceUrl: string }
type ThumbnailResult = { thumbnailUrl: string }
export const imageQueue = new Queue<ThumbnailJob, ThumbnailResult>('images')
export const imageWorker = new Worker<ThumbnailJob, ThumbnailResult>(
'images',
async (job) => {
await job.updateProgress(10)
const file = await downloadImage(job.data.sourceUrl)
await job.updateProgress(60)
const thumbnailUrl = await createThumbnail(file, job.data.uploadId)
await job.updateProgress(100)
return { thumbnailUrl }
},
{ concurrency: 5, timeoutMs: 120_000 },
)Daily report scheduler#
Use a job scheduler when the queue should mint a job on a cron or interval. Create schedulers in startup code so a fresh deploy already has them.
export async function register() {
if (process.env.NEXT_RUNTIME !== 'nodejs') return
const { ensureWorkersRegistered } = await import('@nextmq/sdk/next')
const { reportQueue, reportWorker } = await import('@/jobs/reports')
await ensureWorkersRegistered([reportWorker])
await reportQueue.upsertJobScheduler(
'daily-report',
{ pattern: '0 9 * * *', tz: 'Europe/Budapest' },
{
name: 'build-daily-report',
data: { range: 'yesterday' },
opts: { attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
},
)
}Protect a provider quota#
Use a worker limiter for steady throughput, and throw RateLimitError when the provider tells you exactly when to retry. That requeues the job without burning an attempt.
import { Queue, RateLimitError, Worker } from '@nextmq/sdk'
export const promptQueue = new Queue<{ prompt: string }, { text: string }>('prompts')
export const promptWorker = new Worker(
'prompts',
async (job) => {
const res = await callModel(job.data.prompt)
if (res.status === 429) {
throw new RateLimitError(res.retryAfterMs ?? 1000)
}
if (!res.ok) throw new Error(res.error)
return { text: res.text }
},
{
concurrency: 10,
limiter: { max: 60, duration: 60_000 },
},
)Deduplicate double submits#
Use jobId when your business object has one canonical job. Usededuplication when you want a time-window or replaceable dedup key.
| Pattern | Use when |
|---|---|
jobId | One order/export/import should have one job forever. |
deduplication.id | Repeated clicks or webhooks should collapse for a TTL window. |
await reportQueue.add(
'export-report',
{ reportId },
{
jobId: `report-${reportId}`,
attempts: 3,
},
)
await emailQueue.add(
'send-confirmation',
{ userId },
{
deduplication: { id: `confirmation-${userId}`, ttl: 60_000 },
},
)Fan out and aggregate results#
A flow parent runs after every child finishes. Read child return values in the parent withjob.getChildrenValues().
import { FlowProducer, Queue, Worker } from '@nextmq/sdk'
export const flow = new FlowProducer()
export const renderQueue = new Queue('video-render')
export const notificationQueue = new Queue('video-notify')
export async function enqueueVideoPipeline(videoId: string) {
return flow.add({
name: 'notify',
queueName: 'video-notify',
data: { videoId },
children: [
{ name: 'transcode-1080p', queueName: 'video-render', data: { videoId, res: '1080p' } },
{ name: 'transcode-720p', queueName: 'video-render', data: { videoId, res: '720p' } },
{ name: 'thumbnail', queueName: 'video-render', data: { videoId, kind: 'thumb' } },
],
})
}
export const notifyWorker = new Worker('video-notify', async (job) => {
const children = await job.getChildrenValues()
await sendReadyEmail(job.data, children)
return { notified: true, childCount: Object.keys(children).length }
})Permanent validation failure#
Throw UnrecoverableJobError for inputs that will never succeed. It fails the job immediately and skips remaining attempts.
import { UnrecoverableJobError, Worker } from '@nextmq/sdk'
export const importWorker = new Worker('imports', async (job) => {
if (!job.data.fileUrl) {
throw new UnrecoverableJobError('Missing fileUrl')
}
return runImport(job.data.fileUrl)
})Wait for a short result#
It is fine to wait from a route handler when the work is short and the caller really needs the result. For long jobs, return the job id and let the client poll your app.
import { reportQueue } from '@/jobs/reports'
export async function POST(req: Request) {
const data = await req.json()
const job = await reportQueue.add('preview-report', data, {
attempts: 2,
removeOnComplete: { count: 100, age: 3600 },
})
const result = await job.waitUntilFinished({
timeoutMs: 15_000,
pollIntervalMs: 500,
})
return Response.json(result)
}waitUntilFinished() polls over HTTP. If you use immediate removal, wait before the short result snapshot expires or you may see JobRemovedError.Run code-free scheduled POSTs#
If the work is just "POST this JSON to that URL on a schedule", use Custom Triggers from the dashboard instead of writing SDK code.