# NextMQ Docs Generated from the public docs pages in `apps/site/app/docs`. Commit: b6c190fc2fe6 ## Pages - Introduction: /docs - Quickstart: /docs/quickstart - How it works: /docs/how-it-works - Queues & job options: /docs/queues - Workers & processing: /docs/workers - Inspecting & controlling jobs: /docs/jobs - Schedulers & repeatables: /docs/schedulers - Flows: /docs/flows - Local development: /docs/local-development - Registering on startup: /docs/register - Deploying to Vercel: /docs/vercel - Reliability & delivery: /docs/reliability - Custom Triggers: /docs/custom-triggers - Examples: /docs/examples - Coming from BullMQ: /docs/from-bullmq - Configuration: /docs/configuration - SDK reference: /docs/sdk - Testing: /docs/testing ## Introduction Route: /docs NextMQ is a fully managed BullMQ service for serverless Next.js. You write familiar Queue and Worker code; we host the queue server and run your jobs for you. BullMQ is the job queue most apps reach for background jobs. But it assumes something impossible for serverless apps: a persistent process. To use it on Next.js you'd have to operate your own queue server. NextMQ runs that server for you. Install the SDK, add your project keys, and write normal BullMQ code: your queues live on our managed BullMQ server, and your processors stay in your app, triggered through a signed webhook. Retries, scheduling, rate limits, and flows work by default — BullMQ's developer experience without the infrastructure. ```bash npm install @nextmq/sdk ``` ### The code shape stays familiar Most producer and processor code starts as an import swap. The route and environment are the serverless pieces you add around it. ```diff - import { Queue, Worker } from 'bullmq' + import { Queue, Worker } from '@nextmq/sdk' const emailQueue = new Queue('emails') const emailWorker = new Worker('emails', async (job) => { await sendEmail(job.data) return { sent: true } }) ``` ### What you get - **No infrastructure.** We host and scale the queue server and Redis. - **A familiar API.** `Queue`, `Worker`, and `FlowProducer` primitives — nothing new to learn. - **Defaults that just work.** Retries, backoff, delays, priorities, rate limits, dedup, and flows. ### What you write - Your queues and the processor functions that run your jobs. - One webhook route that auto-registers your workers and verifies every call.. - A little config — your project keys and your app's public URL. That's the whole surface area. Everything else is managed. ### Good fits - Background work in a Next.js app on Vercel or any serverless platform. - Media processing, AI/LLM calls, report generation, syncing to third-party APIs. - Anything you'd use BullMQ for — without running BullMQ's infrastructure. ### Next steps - Quickstart — create a project, add keys, ship your first job. - How it works — what runs where, in one diagram. - Queues & job options — everything you can pass to `add()`. ## Quickstart Route: /docs/quickstart From zero to a working background job in a Next.js app. There's no server or Redis to set up — that's the part we manage. ### Prerequisites - A Next.js App Router app. - A public URL for deployed jobs, or a tunnel for local development. ### 1. Create a project Sign up and create a project in the NextMQ dashboard. You'll get one **connection string** — it bundles your server URL, API key, and webhook secret. Keep it handy for the next step. ### 2. Install the SDK ```bash npm install @nextmq/sdk ``` ### 3. Add your environment Paste the connection string from your dashboard, plus your app's public URL. File: .env.local ```bash NEXTMQ_CONNECTION_STRING=nextmq://v1.... NEXT_PUBLIC_APP_URL=https://your-app.com ``` `NEXT_PUBLIC_APP_URL` is where NextMQ calls back to run your jobs. Keep the connection string server-only — it's a secret. ### 4. Define a queue and a worker The same primitives as BullMQ. The processor is the code NextMQ calls back to run. File: jobs/images.ts ```ts import { Queue, Worker } from '@nextmq/sdk' export const imageQueue = new Queue<{ uploadId: string }>('images') export const imageWorker = new Worker('images', async (job) => { await job.updateProgress(25) const thumb = await makeThumbnail(job.data.uploadId) return { thumbnailUrl: thumb.url } }) async function makeThumbnail(uploadId: string) { // Replace with your image pipeline. return { url: `https://cdn.example.com/thumbs/${uploadId}.jpg` } } ``` ### 5. Add the webhook route One catch-all route handles every queue. It verifies signatures, dispatches jobs to the right worker, and exposes a `/health` path. It must run on the Node.js runtime. File: app/api/nextmq/[...path]/route.ts ```ts import { createNextMQHandler } from '@nextmq/sdk/next' import { imageWorker } from '@/jobs/images' export const runtime = 'nodejs' export const { GET, POST } = createNextMQHandler({ workers: [imageWorker], }) ``` ### 6. Enqueue a job From a route handler or server action: File: app/api/upload/route.ts ```ts import { imageQueue } from '@/jobs/images' export async function POST(req: Request) { const { uploadId } = await req.json() const job = await imageQueue.add('thumbnail', { uploadId }, { attempts: 5, backoff: { type: 'exponential', delay: 1000 }, }) return Response.json({ queued: true, jobId: job.id }) } ``` ### 7. See it run Open the dashboard and look at the `images` queue, or wait for the result when the job is short enough for the original request to stay open. ```ts const job = await imageQueue.add('thumbnail', { uploadId }) const result = await job.waitUntilFinished({ timeoutMs: 15_000, pollIntervalMs: 500, }) ``` ### What happens now NextMQ stores the job, a real BullMQ worker picks it up, and it calls your `/api/nextmq` route to run `imageWorker`. If your processor throws, NextMQ retries with exponential backoff up to five attempts — no extra code from you. ### Keep going - Queues & job options — every supported `add()` option. - Workers & processing — concurrency, rate limits, and outcomes. - Deploying to Vercel — env vars and keeping idle queues warm. - Examples — copy-paste patterns for common jobs. ## How it works Route: /docs/how-it-works You write a queue and a processor. NextMQ runs the BullMQ server and Redis, schedules the work, and calls your processor back over a signed webhook. Diagram: How a job flows. Your Next.js app includes Pages, Route handlers, Server actions, and the NextMQ SDK. It talks over HTTPS both ways to NextMQ, which runs the BullMQ server and Redis. The BullMQ-API-compatible SDK lives in your app and talks to NextMQ over HTTPS. We run the server and Redis for you. The left side is your app: producers, processor code, route handler, and secrets. The right side is NextMQ: Redis, BullMQ workers, schedules, retries, and job state. ### Your side `@nextmq/sdk` gives you the BullMQ-shaped `Queue`, `Worker`, and `FlowProducer` classes. There's no Redis connection in your app. Calling `queue.add()` sends an authenticated request to NextMQ. A `Worker` declares the processor function that should run when a job is triggered. The one piece of plumbing you add is a webhook route built with `createNextMQHandler`. It verifies every signed request, registers your workers and dispatches incoming jobs. ### Our side NextMQ runs the real BullMQ `Queue` and `Worker` instances against a managed Redis. We own the scheduler, lock handling, retries, backoff, delays, priority, concurrency, and rate limiting. Your jobs and their state are stored durably and isolated to your project. Note: Job data, progress, logs, return values, errors, and state live in NextMQ storage. Your processor source code and server secrets stay in your app; NextMQ calls them over the signed webhook route. ### The lifecycle of a job 1. **Enqueue.** Your route calls `queue.add()`; the SDK sends it to NextMQ, which stores the job. 2. **Schedule.** Our BullMQ worker applies delay, priority, and rate limits, then picks the job up when it's due. 3. **Dispatch.** NextMQ signs the job and calls your webhook route. 4. **Process.** The handler verifies the signature and runs your processor function. 5. **Finalize.** We read your response and mark the job `completed`, `failed` (with retries/backoff), or rate-limited. ### Why a webhook instead of a local worker A BullMQ worker needs a blocking Redis connection and timers that run continuously — which a serverless function, frozen between requests, can't hold. So that loop lives on our side, and your app only needs to be reachable for one request → one result. This is also why a few BullMQ APIs differ here; see Coming from BullMQ. ## Queues & job options Route: /docs/queues A Queue is a named producer. Creating one is local; add() and addBulk() send your jobs to NextMQ, which validates the options and runs them. ### Creating a queue ```ts import { Queue } from '@nextmq/sdk' type NotifyJob = { userId: string; channel: 'push' | 'sms' } const notificationQueue = new Queue('notifications') ``` Note: Queue names must be 1-128 characters and may contain only letters, numbers, `_`, `-`, and `.`. Custom `jobId` values cannot contain `:` or be integer strings. ### Adding jobs ```ts // Single job await notificationQueue.add('digest', { userId, channel: 'push' }) // With options await notificationQueue.add('digest', { userId, channel: 'push' }, { delay: 5000, attempts: 3, backoff: { type: 'exponential', delay: 2000 }, priority: 1, }) ``` #### addBulk ```ts await notificationQueue.addBulk([ { name: 'digest', data: { userId: 'u_1', channel: 'push' } }, { name: 'digest', data: { userId: 'u_2', channel: 'sms' }, opts: { delay: 1000 } }, ]) ``` ### Supported job options Pass any of these in the third argument to `add()`. Options that can't survive a remote queue are rejected with a clear error, never silently dropped. | Option | Description | | --- | --- | | delay | Milliseconds to wait before the job becomes eligible to run. | | attempts | Total times to try the job before it's marked failed. | | backoff | Retry delay strategy: { type: 'fixed' \| 'exponential', delay }. | | priority | Lower number runs first. | | jobId | Custom job id. It cannot contain ':' or be an integer string. | | lifo | Push to the front of the queue instead of the back. | | keepLogs | Max number of log lines to retain. | | sizeLimit | Reject the job if its serialized data exceeds this many bytes. | | removeOnComplete | Retention for completed jobs — see clamping below. | | removeOnFail | Retention for failed jobs — see clamping below. | | repeat | Limited repeatable config; prefer Job Schedulers. | | deduplication | Collapse duplicate work — see below. | ### Retention NextMQ automatically sets retention defaults, so completed and failed jobs don't pile up and your storage stays bounded. You can override them per job with `removeOnComplete`, `removeOnFail`, and `keepLogs` — see the SDK reference for the specifics. ### Deduplication Pass a `deduplication` id to collapse duplicate work within a TTL window — handy when a form can be double-submitted or a request gets retried. ```ts await reportQueue.add('csv', { reportId }, { deduplication: { id: `csv-${reportId}`, ttl: 60_000 }, }) ``` Supported fields: `id`, `ttl`, `extend`, `replace`, and `keepLastIfActive`. The helpers `getDeduplicationJobId` and `removeDeduplicationKey` are available on the queue. ### Queue-wide throughput controls `setGlobalConcurrency` and `setGlobalRateLimit` cap a whole queue's throughput — how many jobs run at once and how many start per window — and you can change them at runtime from the SDK or dashboard with no redeploy. Use them to throttle a queue protecting a shared third-party or AI API; the full method list is in the SDK reference. ### Inspecting and maintaining queues Queue reads are paged HTTP calls. Use `getJobs()` or state helpers such as `getWaiting()` and `getFailed()` for lists, and count helpers such as `getWaitingCount()` for counters. Server-side maintenance methods include `pause()`, `resume()`, `drain()`, `clean()`, `retryJobs()`, `promoteJobs()`, and `obliterate()`. Every `Queue` method is described in the SDK reference. Tip: Need recurring jobs? Use Job Schedulers rather than ad-hoc `repeat` options. ## Workers & processing Route: /docs/workers A Worker declares the processor that runs your jobs. NextMQ runs the worker loop and calls your processor over a signed webhook — you just write the function. ### Declaring a worker File: jobs/ai.ts ```ts import { Worker } from '@nextmq/sdk' export const summarizeWorker = new Worker('ai', async (job) => { await job.updateProgress(20) const summary = await llm.summarize(job.data.docId) return { summary } }, { concurrency: 5, limiter: { max: 100, duration: 1000 }, // protect your model provider's quota }) ``` Note: One worker per queue, per project. A fresh `Worker` on cold start re-registers idempotently — you don't manage worker lifecycle by hand. ### Mounting the route `createNextMQHandler` returns `GET` and `POST` handlers that verify signatures, dispatch jobs to the right worker, and serve a `/health` path. It must run on the Node.js runtime. File: app/api/nextmq/[...path]/route.ts ```ts import { createNextMQHandler } from '@nextmq/sdk/next' import { summarizeWorker } from '@/jobs/ai' export const runtime = 'nodejs' export const { GET, POST } = createNextMQHandler({ workers: [summarizeWorker], }) ``` ### Registration Registration publishes one deterministic webhook URL per queue. The handler registers its workers, but if producers can enqueue before the route is first hit, register explicitly from your bootstrap or producer path: ```ts import { ensureWorkersRegistered, getWorkerRegistrationStatuses } from '@nextmq/sdk/next' import { summarizeWorker } from '@/jobs/ai' await ensureWorkersRegistered([summarizeWorker]) const statuses = await getWorkerRegistrationStatuses([summarizeWorker]) ``` Liveness is inferred from **webhook delivery**, not an in-process heartbeat. Re-registration is idempotent: a new deploy can re-assert or update the canonical callback URL, and the live BullMQ worker restarts only when execution parameters change. Webhook secrets are provider-owned and are never sent during worker registration. Tip: Schedule a cron that pings the handler's `/health` path to re-assert registration for queues that go idle. See Deploying to Vercel. ### Worker options Most apps only need concurrency, rate limiting, and timeout: | Option | Description | | --- | --- | | concurrency | Max jobs this worker processes in parallel. | | limiter | Rate limit: { max, duration } per window. | | timeoutMs | Max time to wait for your webhook response. Default 30 000 ms; max 300 000 ms. | Heads up: Keep `timeoutMs` below your platform's function timeout. If the platform kills or freezes an invocation after NextMQ times out, BullMQ may retry while the old invocation is still finishing. For long-running or stalled-job tuning, `lockDuration`, `lockRenewTime`, `stalledInterval`, `maxStalledCount`, and `drainDelay` are also accepted — most apps never set them. Bounds are in the SDK reference. ### Signalling job outcome NextMQ maps your processor's behavior to job state: - **Return a value** → the job is `completed` with that value. - **Throw** → the job `failed`; retries and backoff apply. - **Throw `UnrecoverableJobError`** → fail immediately, skipping remaining attempts. - **Throw `RateLimitError`** → rate-limit and re-queue without burning an attempt. ```ts import { Worker, UnrecoverableJobError, RateLimitError } from '@nextmq/sdk' export const aiWorker = new Worker('ai', async (job) => { if (!job.data.docId) throw new UnrecoverableJobError('missing docId') const res = await llm.summarize(job.data.docId) if (res.status === 429) throw new RateLimitError(res.retryAfterMs ?? 1000) return { summary: res.summary } }) ``` #### Non-autorun workers Pass `{ autorun: false }` when a worker should be mounted in the handler but not registered by default health checks. Register it explicitly with `includeNonAutorun`. ```ts const backfillWorker = new Worker('backfills', processor, { autorun: false }) await ensureWorkersRegistered([backfillWorker], { includeNonAutorun: true, }) ``` Note: `worker.close()` is local-only. It stops using that local declaration but does not unregister the durable server-side worker. Intentional removal is an admin operation. #### Pause & resume Pause or resume processing for a queue: ```ts await aiQueue.pause() await aiQueue.resume() ``` Heads up: Webhook delivery is at-least-once, so processors must be idempotent. See Reliability & delivery guarantees for the delivery contract and the `job.id` idempotency pattern. ## Inspecting & controlling jobs Route: /docs/jobs Read job state, control jobs from a queue, and report progress and logs from inside a processor. Every call talks to NextMQ over HTTP. ### Fetching jobs ```ts const job = await reportQueue.getJob(jobId) const state = await job.getState() // 'waiting' | 'active' | 'completed' | ... await job.refresh() // re-fetch latest state const recent = await reportQueue.getJobs(['waiting', 'active'], 0, 20) const failed = await reportQueue.getFailed(0, 20) const counts = await reportQueue.getJobCounts() // { waiting, active, failed, ... } const total = await reportQueue.count() ``` ### Reporting from inside a processor Inside a worker processor, use the `job` handle to stream progress and logs back as the work runs. ```ts export const exportWorker = new Worker('exports', async (job) => { await job.log('starting export') await job.updateProgress(10) const rows = await fetchRows(job.data) await job.updateProgress(60) await job.updateData({ ...job.data, rowCount: rows.length }) return { url: await uploadCsv(rows) } }) ``` Read retained logs later from the queue: ```ts const { logs, count } = await reportQueue.getJobLogs(jobId, 0, 99) ``` ### Acting on jobs | Method | What it does | | --- | --- | | retry() | Move a failed job back to waiting to try again. | | promote() | Promote a delayed job so it runs now. | | changeDelay(ms) | Reschedule a delayed job. | | changePriority(opts) | Change a waiting job's priority. | | remove(opts?) | Delete the job from the queue; pass { removeChildren: true } for flow trees. | | updateData(data) | Replace the job's data payload. | | updateProgress(n) | Report progress (number or object). | | log(line) | Append a log line (retained up to keepLogs). | ```ts const job = await reportQueue.getJob(jobId) await job.changePriority({ priority: 1 }) await job.promote() await job.retry() await job.remove({ removeChildren: true }) ``` Job instances also expose state helpers such as `isCompleted()`, `isFailed()`, `isDelayed()`, `isActive()`, `isWaiting()`, and `isWaitingChildren()`. The full Job surface — every property and method — is in the SDK reference. #### Parents & child values For flow parents, read child return values with `getChildrenValues()`. A child's parent reference is on `job.parent` and `job.parentKey`. ```ts const values = await job.getChildrenValues() // { ':': returnValue, ... } const parentRef = job.parent // { id, queueKey } | undefined ``` ### Waiting for completion `waitUntilFinished()` polls until the job reaches a terminal state. It's a convenience over re-fetching the job — there's no long-lived connection involved. ```ts const job = await reportQueue.add('csv', { reportId }) const result = await job.waitUntilFinished({ timeoutMs: 30_000, pollIntervalMs: 1000, }) ``` ## Schedulers & repeatables Route: /docs/schedulers Job Schedulers are the forward-looking way to run recurring work. NextMQ owns the scheduling clock, so repeating jobs fire even while your functions are cold. ### Upserting a scheduler `upsertJobScheduler` creates or updates a scheduler by key. Use a cron `pattern` or a fixed `every` interval. Upsert is idempotent, so it is safe to run from startup code. ```ts import { Queue } from '@nextmq/sdk' const reportQueue = new Queue('reports') // Cron: every day at 09:00 in a fixed timezone. await reportQueue.upsertJobScheduler('daily-report', { pattern: '0 9 * * *', tz: 'Europe/Berlin', }, { name: 'report', data: { kind: 'daily' }, }) // Fixed interval: every 15 minutes. await reportQueue.upsertJobScheduler('healthcheck', { every: 15 * 60 * 1000, }) ``` ### Repeat options Common cron patterns: | Pattern | Runs | | --- | --- | | 0 * * * * | At the top of every hour. | | 0 9 * * * | Every day at 09:00 in the scheduler timezone. | | 0 9 * * 1 | Every Monday at 09:00. | | Option | Description | | --- | --- | | pattern | Cron expression for the schedule. | | every | Fixed interval in milliseconds (alternative to pattern). | | limit | Maximum number of times to repeat. | | tz | Timezone for cron evaluation. | | startDate | Don't fire before this time. | | endDate | Don't fire after this time. | | immediately | Produce the first job right away rather than waiting a full interval. | Note: Set `tz` for calendar schedules that must follow a specific timezone. Cron schedules with `tz` follow that timezone's civil calendar, including DST. Fixed `every` schedules are duration-based. Note: Scheduler templates produce ordinary jobs with the given `name`, `data`, and safe job options. Template options cannot include `jobId`, `repeat`, `delay`, `deduplication`, or `callbackUrl`; scheduled jobs use the queue's canonical worker registration URL. ### What the worker receives A scheduler creates ordinary jobs. The worker sees the template `name` and `data`, exactly as if you had called `queue.add()`. ```ts await reportQueue.upsertJobScheduler( 'daily-report', { pattern: '0 9 * * *', tz: 'Europe/Berlin' }, { name: 'build-report', data: { range: 'yesterday' } }, ) new Worker('reports', async (job) => { // job.name === 'build-report' // job.data.range === 'yesterday' }) ``` ### Reading and removing schedulers ```ts // All schedulers on the queue const all = await reportQueue.getJobSchedulers() // A single scheduler by key const one = await reportQueue.getJobScheduler('daily-report') // Remove a scheduler await reportQueue.removeJobScheduler('daily-report') ``` Note: NextMQ supports the modern Job Scheduler CRUD. Legacy repeatable-job management (`getRepeatableJobs`, `removeRepeatableByKey`) is intentionally omitted — two ways to manage the same thing is worse than one. A limited `repeat` option on `add()` still exists for simple cases, but schedulers are preferred. ### Why this works when functions sleep The scheduler clock lives on NextMQ, not in your app. When a scheduled job is due, NextMQ produces it and calls your worker route via webhook — the same path as any other job. Your app doesn't need to be awake for the schedule to advance. Create important schedulers in startup registration so a fresh deployment has both worker registration and repeat jobs in place. The full scheduler method and option list is in the SDK reference. Tip: A scheduler stops producing new jobs after its `limit` is reached or after `endDate`. Remove or upsert the scheduler again when you intentionally want to resume it. ## Flows Route: /docs/flows A flow is a tree of jobs where a parent waits for its children. FlowProducer creates the whole graph in one call; NextMQ runs each node as an ordinary job. ### Creating a flow File: jobs/checkout.ts ```ts import { FlowProducer } from '@nextmq/sdk' const flow = new FlowProducer() await flow.add({ name: 'finalize-order', queueName: 'orders', data: { orderId }, children: [ { name: 'charge', queueName: 'payments', data: { orderId } }, { name: 'reserve-stock', queueName: 'inventory', data: { orderId } }, ], }) ``` The parent `finalize-order` only runs once both children complete. Children run in parallel, subject to each queue's concurrency and rate limits. ```text orders:finalize-order ├─ payments:charge └─ inventory:reserve-stock ``` ### Reading child results In the parent processor, gather the children's return values with `job.getChildrenValues()`. ```ts export const orderWorker = new Worker('orders', async (job) => { const children = await job.getChildrenValues() // keyed by each child's job key → its return value: // { 'payments:42': { receiptId }, 'inventory:43': { reservationId } } return { finalized: true, children } }) ``` ### Failure semantics Control how a failing child affects its parent with these flags, passed in a child's `opts`: | Flag | Effect when the child fails | | --- | --- | | failParentOnFailure | Fail the parent immediately. | | continueParentOnFailure | Let the parent proceed even though this child failed. | | ignoreDependencyOnFailure | Don't block the parent on this child's result. | | removeDependencyOnFailure | Drop this child from the parent's dependency set. | ```ts await flow.add({ name: 'finalize-order', queueName: 'orders', data: { orderId }, children: [ { name: 'charge', queueName: 'payments', data: { orderId }, opts: { attempts: 3, failParentOnFailure: true }, }, { name: 'send-analytics', queueName: 'analytics', data: { orderId }, opts: { continueParentOnFailure: true }, }, ], }) ``` ### Inspecting a flow ```ts const tree = await flow.getFlow({ id: parentJobId, queueName: 'orders' }) // addBulk() creates multiple independent flows in one call: await flow.addBulk([flowA, flowB]) ``` The `FlowProducer` methods and child options are listed in the SDK reference. ## Local development Route: /docs/local-development NextMQ runs your processors by calling your webhook route over the public internet, so localhost isn't reachable on its own. Expose your local server through a tunnel and point callbacks at it. ### Why you need a tunnel Producing jobs from localhost works out of the box — that's just an outbound HTTPS call to NextMQ. The catch is the other direction: NextMQ runs your jobs by calling your webhook route, and `http://localhost:3000` only exists on your machine. A tunnel gives your local server a public HTTPS URL; you tell NextMQ to deliver callbacks there with `NEXTMQ_WEBHOOK_BASE_URL`. ### Environment File: .env.local ```bash NEXTMQ_CONNECTION_STRING=nextmq://v1.... NEXT_PUBLIC_APP_URL=http://localhost:3000 # The public tunnel origin from the next step: NEXTMQ_WEBHOOK_BASE_URL=https://your-tunnel.example.com ``` Tip: Use a **separate project** for local development so test jobs never touch your production queues. Each project has its own connection string — set the dev one here and the prod one in your deployment. ### Pick a tunnel | Tunnel | Install | Account needed | | --- | --- | --- | | Cloudflare Tunnel | brew install cloudflared | No (for quick tunnels). | | ngrok | brew install ngrok | Yes — free authtoken. | ### Option A — Cloudflare Tunnel With your Next.js dev server running on port 3000, start a quick tunnel in another terminal: ```bash cloudflared tunnel --url http://localhost:3000 ``` It prints a `https://.trycloudflare.com` URL. Put that in `NEXTMQ_WEBHOOK_BASE_URL` and restart `next dev` so the value is picked up. ### Option B — ngrok Add your authtoken once (from the ngrok dashboard), then start the tunnel: ```bash ngrok config add-authtoken ngrok http 3000 ``` Copy the `https` _Forwarding_ URL into `NEXTMQ_WEBHOOK_BASE_URL` and restart `next dev`. Heads up: On both free tiers the public URL **changes every restart**. When it changes, update `NEXTMQ_WEBHOOK_BASE_URL` and restart Next so workers re-register at the new origin. A reserved ngrok domain or a named Cloudflare tunnel gives you a stable URL. ### Register and verify Once the dev server is up with the tunnel set, hit the health path. It registers your workers at the tunnel origin and confirms NextMQ can reach you: ```bash curl -fsS https://your-tunnel.example.com/api/nextmq/health ``` A 200 with registered workers means callbacks will arrive. To register worker automatically on boot, see Registering on startup. ### Related pages - Configuration — the full environment reference. - Registering on startup — auto-register on boot. ## Registering on startup Route: /docs/register By default you register workers by calling ensureWorkersRegistered (e.g. by hitting the handler's /health path), and create schedulers by hand. Move both into your app's startup so a fresh deploy is wired up the moment it boots. ### The register() hook Next.js calls the `register` export in `instrumentation.ts` once when the server starts. It is the place to register your workers and create your schedulers — no route has to be hit and no script has to be run. File: instrumentation.ts ```ts export async function register() { // Server-only: skip the Edge runtime. if (process.env.NEXT_RUNTIME !== 'nodejs') return const { ensureWorkersRegistered } = await import('@nextmq/sdk/next') const { Queue } = await import('@nextmq/sdk') const { summarizeWorker } = await import('@/jobs/ai') try { // Publish each worker's webhook URL up front. await ensureWorkersRegistered([summarizeWorker]) // Create repeatable schedulers so a fresh deploy already has them. await new Queue('reports').upsertJobScheduler('daily-report', { pattern: '0 9 * * *', }) } catch (err) { // Best-effort: never crash boot. You can still register by hitting /api/nextmq/health. console.error('[nextmq] startup registration failed', err) } } ``` Note: Use dynamic `import()` inside `register` and guard on `NEXT_RUNTIME === 'nodejs'` so server-only SDK code never loads in the Edge runtime. Both calls are idempotent, so they are safe to run on every boot. ### Why startup, not a route call Until something calls `ensureWorkersRegistered` — at boot, or by hitting `/api/nextmq/health` — a fresh deploy has no registered worker, and a scheduler only exists once you upsert it. Registering on boot means producers can enqueue and schedulers tick the moment you deploy, with no "remember to ping the route" step. Registration is durable, so you do it once per deploy, not on a timer. ### What runs when | Environment | register() fires | | --- | --- | | Long-running Node server | Once, at process boot. | | Serverless (Vercel) | On each cold start — i.e. the first request that boots an instance, not at deploy time. Idempotent, so safe to repeat. | Note: Registration uses the same `NEXTMQ_CONNECTION_STRING` and `NEXT_PUBLIC_APP_URL` as the rest of the SDK — see Configuration. On Next.js 15+, `instrumentation.ts` is stable with no flag; older versions need `experimental.instrumentationHook` in `next.config`. ## Deploying to Vercel Route: /docs/vercel There's no infrastructure to deploy — NextMQ runs the queue. You just deploy your Next.js app with the right environment. ### Set your environment Add the connection string and your app URL in Project → Settings → Environment Variables. Use the **same connection string across all environments** (Production, Preview, and Development) — that's what makes preview routing work. ```bash NEXTMQ_CONNECTION_STRING=nextmq://v1.... NEXT_PUBLIC_APP_URL=https://your-app.com ``` See Configuration for what each value does. ### Preview deployments just work Each Vercel deployment gets a unique URL. NextMQ handles that for you: your production deployment registers the queue's canonical consumer, and every deployment stamps its own URL on the jobs it enqueues. The server delivers each job back to the deployment that created it — so a preview's jobs run on that preview, signed with the shared webhook secret from your connection string. ```text Production deploy registers canonical worker URL -> https://your-app.com/api/nextmq/worker/emails Preview deploy enqueues a job with callbackUrl -> https://branch-git-feature.vercel.app/api/nextmq/worker/emails Scheduler jobs use the canonical worker URL because they are not created by a preview request ``` Heads up: **Deployment Protection.** Preview URLs are often gated by Vercel Authentication, which will reject NextMQ's webhook with a 401. Enable _Protection Bypass for Automation_ (or disable protection for previews) so deliveries reach them. ### Pinning a callback origin (optional) To send every callback to one fixed origin instead of per-deployment routing, set `NEXTMQ_WEBHOOK_BASE_URL`. When set, it overrides the per-deployment URL and all jobs call back there. ```bash NEXTMQ_WEBHOOK_BASE_URL=https://your-app.com ``` ### Register workers on deploy A worker publishes its webhook URL to NextMQ once, and that registration is durable — it doesn't lapse while a queue sits idle, so there's nothing to keep alive. Do it at boot with startup registration (recommended), or hit `/api/nextmq/health` once after your first deploy. ### Local development NextMQ calls your app over the public internet, so `localhost` needs a tunnel. See Local development for ngrok and Cloudflare Tunnel setup. ### Separate dev and prod Want isolated queues, keys, and metrics per environment? Create a separate project — each gets its own connection string — and set the dev one for Preview + Development and the prod one for Production. Isolation is per-project, so dev can never touch prod data. Note: Not on Vercel? Any platform works the same way — expose the Node.js route at a public HTTPS URL, keep `NEXTMQ_CONNECTION_STRING` server-only, and set `NEXTMQ_WEBHOOK_BASE_URL` if your callback origin differs from `NEXT_PUBLIC_APP_URL`. ### Before you ship - The same `NEXTMQ_CONNECTION_STRING` set across all environments. - `NEXT_PUBLIC_APP_URL` points at your deployed domain. - Preview Deployment Protection bypassed for automation, if you use previews. - Processors are idempotent — see Reliability & delivery guarantees. - Workers register on deploy — via startup registration or a one-time `/api/nextmq/health` hit. ## Reliability & delivery guarantees Route: /docs/reliability NextMQ runs real BullMQ workers, but your processor runs over a signed webhook. This page is the production contract for that boundary. ### Delivery is at-least-once A job may be delivered more than once. The common causes are webhook timeouts, platform function timeouts, network failures, retries, and replay of a recently signed request. Treat processors as idempotent units of work. File: jobs/payouts.ts ```ts export const payoutWorker = new Worker('payouts', async (job) => { if (await payouts.hasProcessed(job.id)) { return { skipped: true } } const transfer = await bank.transfer(job.data) await payouts.recordProcessed(job.id, transfer.id) return { transferId: transfer.id } }) ``` Tip: Use `job.id` for business idempotency. Use `job.deliveryId` when you need per-attempt logs or deduplication. ### Retries and outcomes NextMQ maps the result of your processor to BullMQ job state. Attempts and backoff apply the same way they do for a BullMQ worker, but the signal crosses the webhook response. | Processor behavior | Server result | | --- | --- | | Return a value | Complete the job with that return value. | | Throw an Error | Fail this attempt; attempts and backoff decide whether it retries. | | UnrecoverableJobError | Fail the job immediately without using remaining attempts. | | RateLimitError | Rate-limit the queue and re-queue the job without burning an attempt. | | job.moveToDelayed(timestamp) | Move the job back to delayed until the absolute timestamp. | #### Retry example ```ts await emailQueue.add('welcome', { userId }, { attempts: 5, backoff: { type: 'exponential', delay: 1000 }, }) ``` ### Timeouts can overlap `timeoutMs` is how long NextMQ waits for your webhook response. Your hosting platform has its own function timeout. Keep `timeoutMs` below the platform timeout, and still expect overlap: a timed-out invocation may continue while BullMQ retries the job. Heads up: Timeout safety comes from idempotency, not from assuming there is only one running invocation. ### Stalled jobs NextMQ owns BullMQ locks and stall recovery on the server. If a delivery times out, the worker process dies, or the server loses the active lock, BullMQ can retry the job according to its stalled-job rules. The worker options `lockDuration`, `lockRenewTime`, `stalledInterval`, and `maxStalledCount` exist for advanced tuning, but most apps should leave them at the defaults and focus on idempotency. ### Signatures prove recency, not uniqueness The webhook handler verifies the request signature and timestamp before running your processor. That proves the request was signed by NextMQ recently. It does not make the request one-time; a captured request can be replayed inside the signature tolerance window. ### Waiting for completion `waitUntilFinished()` polls the job endpoint until the job reaches `completed` or `failed`. Native Redis pub/sub `QueueEvents` is not available in the serverless model. Note: If a job is removed immediately with `removeOnComplete` or `removeOnFail`, NextMQ keeps a short terminal-result snapshot, about 60 seconds. After that, polling raises `JobRemovedError`. ### Backpressure and 503s When Redis storage is near capacity, job-creating routes can return 503. Reads, callbacks, cleanup, and worker registration keep working so queues can recover. Treat enqueue 503s as retryable with backoff. ### Where to go next - SDK reference for technical bounds and the full surface. - Workers & processing for worker options. ## Custom Triggers Route: /docs/custom-triggers A Custom Trigger is a scheduled, signed HTTP POST to a URL you own — set up entirely in the dashboard. Reach for one when you want cron-like automation that calls an endpoint on a schedule. Note: **Dashboard-only.** Triggers are created and managed from the Triggers page in the dashboard — there's no SDK or producer code to write in your app. ### When to use one | Use a trigger | Use SDK code | | --- | --- | | Call a public endpoint on a cron or interval. | You need producer code or business logic before enqueue. | | An operator should pause, fire, or delete it from the dashboard. | A Next.js app that should run a Worker processor. | ### Create a trigger 1. Open the dashboard and go to **Triggers**. 2. Enter a name and the target URL (`http` or `https`). 3. Choose a cron pattern or a fixed interval. 4. Set the JSON payload, plus optional retries and backoff. ### What your endpoint receives On each fire, NextMQ sends a POST whose body is **exactly the JSON payload you configured**. Any `2xx` response counts as success; the body of your response is ignored. A non-2xx (or an unreachable endpoint) is a failure and retries if you enabled retries. File: request body ```json { "source": "trigger", "report": "daily" } ``` Note: Requests are signed with `x-nextmq-signature` and `x-nextmq-timestamp`. Verifying is optional — if you want to confirm a call came from NextMQ, check the HMAC with the webhook secret from your connection string. ### Related pages - Schedulers & repeatables - Reliability & delivery guarantees ## Examples Route: /docs/examples Complete Next.js patterns you can lift into an app: enqueue from a route handler or server action, then process over the webhook. Retries, schedulers, rate limits, deduplication, flows, short waits, and permanent failures. Every NextMQ feature uses the same three pieces: a shared **job module** that defines the queue and worker, the **webhook route** from the Quickstart that runs your processors, and your **app code** — a route handler or server action — that enqueues work. The first example shows all three; the rest focus on the job module, since the route stays the same. ### End-to-end: welcome email with retries The job module defines the queue, the worker, and a typed `enqueue` helper. Use attempts and exponential backoff for transient email providers, and keep the processor idempotent with `job.id` if the send can create a side effect. File: jobs/emails.ts ```ts import { Queue, Worker } from '@nextmq/sdk' type WelcomeEmail = { userId: string; email: string } type EmailResult = { messageId: string } export const emailQueue = new Queue('emails') export const emailWorker = new Worker('emails', async (job) => { await job.log(`Sending welcome email to ${job.data.email}`) const messageId = await sendWelcomeEmail(job.data.email) return { messageId } }) export async function enqueueWelcomeEmail(input: WelcomeEmail) { return emailQueue.add('welcome', input, { attempts: 5, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: { count: 1000, age: 24 * 60 * 60 }, removeOnFail: { count: 5000, age: 7 * 24 * 60 * 60 }, }) } ``` Mount the worker once in the catch-all webhook route. NextMQ calls this route to run the processor — it must use the Node.js runtime. File: app/api/nextmq/[...path]/route.ts ```ts import { createNextMQHandler } from '@nextmq/sdk/next' import { emailWorker } from '@/jobs/emails' export const runtime = 'nodejs' export const { GET, POST } = createNextMQHandler({ workers: [emailWorker] }) ``` Now enqueue from anywhere in your app. From a route handler after a signup: File: app/api/signup/route.ts ```ts import { enqueueWelcomeEmail } from '@/jobs/emails' export async function POST(req: Request) { const { userId, email } = await req.json() await createUser({ userId, email }) await enqueueWelcomeEmail({ userId, email }) // returns immediately return Response.json({ ok: true }) } ``` Or from a server action, e.g. a form submit: File: app/actions.ts ```ts 'use server' import { enqueueWelcomeEmail } from '@/jobs/emails' export async function signUp(formData: FormData) { const email = String(formData.get('email')) const user = await createUser({ email }) await enqueueWelcomeEmail({ userId: user.id, email }) } ``` ### Image thumbnail generation Good for work that should not block the upload request. Track progress so the dashboard and API readers can show useful state. File: jobs/images.ts ```ts import { Queue, Worker } from '@nextmq/sdk' type ThumbnailJob = { uploadId: string; sourceUrl: string } type ThumbnailResult = { thumbnailUrl: string } export const imageQueue = new Queue('images') export const imageWorker = new Worker( 'images', async (job) => { await job.updateProgress(10) const file = await downloadImage(job.data.sourceUrl) await job.updateProgress(60) const thumbnailUrl = await createThumbnail(file, job.data.uploadId) await job.updateProgress(100) return { thumbnailUrl } }, { concurrency: 5, timeoutMs: 120_000 }, ) ``` ### Daily report scheduler Use a job scheduler when the queue should mint a job on a cron or interval. Create schedulers in startup code so a fresh deploy already has them. File: instrumentation.ts ```ts export async function register() { if (process.env.NEXT_RUNTIME !== 'nodejs') return const { ensureWorkersRegistered } = await import('@nextmq/sdk/next') const { reportQueue, reportWorker } = await import('@/jobs/reports') await ensureWorkersRegistered([reportWorker]) await reportQueue.upsertJobScheduler( 'daily-report', { pattern: '0 9 * * *', tz: 'Europe/Budapest' }, { name: 'build-daily-report', data: { range: 'yesterday' }, opts: { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, }, ) } ``` ### Protect a provider quota Use a worker limiter for steady throughput, and throw `RateLimitError` when the provider tells you exactly when to retry. That requeues the job without burning an attempt. File: jobs/llm.ts ```ts import { Queue, RateLimitError, Worker } from '@nextmq/sdk' export const promptQueue = new Queue<{ prompt: string }, { text: string }>('prompts') export const promptWorker = new Worker( 'prompts', async (job) => { const res = await callModel(job.data.prompt) if (res.status === 429) { throw new RateLimitError(res.retryAfterMs ?? 1000) } if (!res.ok) throw new Error(res.error) return { text: res.text } }, { concurrency: 10, limiter: { max: 60, duration: 60_000 }, }, ) ``` ### Deduplicate double submits Use `jobId` when your business object has one canonical job. Use `deduplication` when you want a time-window or replaceable dedup key. | Pattern | Use when | | --- | --- | | jobId | One order/export/import should have one job forever. | | deduplication.id | Repeated clicks or webhooks should collapse for a TTL window. | ```ts await reportQueue.add( 'export-report', { reportId }, { jobId: `report-${reportId}`, attempts: 3, }, ) await emailQueue.add( 'send-confirmation', { userId }, { deduplication: { id: `confirmation-${userId}`, ttl: 60_000 }, }, ) ``` Heads up: Custom `jobId` values cannot contain `:` and cannot be integer strings. See SDK reference. ### Fan out and aggregate results A flow parent runs after every child finishes. Read child return values in the parent with `job.getChildrenValues()`. File: jobs/video.ts ```ts import { FlowProducer, Queue, Worker } from '@nextmq/sdk' export const flow = new FlowProducer() export const renderQueue = new Queue('video-render') export const notificationQueue = new Queue('video-notify') export async function enqueueVideoPipeline(videoId: string) { return flow.add({ name: 'notify', queueName: 'video-notify', data: { videoId }, children: [ { name: 'transcode-1080p', queueName: 'video-render', data: { videoId, res: '1080p' } }, { name: 'transcode-720p', queueName: 'video-render', data: { videoId, res: '720p' } }, { name: 'thumbnail', queueName: 'video-render', data: { videoId, kind: 'thumb' } }, ], }) } export const notifyWorker = new Worker('video-notify', async (job) => { const children = await job.getChildrenValues() await sendReadyEmail(job.data, children) return { notified: true, childCount: Object.keys(children).length } }) ``` ### Permanent validation failure Throw `UnrecoverableJobError` for inputs that will never succeed. It fails the job immediately and skips remaining attempts. ```ts import { UnrecoverableJobError, Worker } from '@nextmq/sdk' export const importWorker = new Worker('imports', async (job) => { if (!job.data.fileUrl) { throw new UnrecoverableJobError('Missing fileUrl') } return runImport(job.data.fileUrl) }) ``` ### Wait for a short result It is fine to wait from a route handler when the work is short and the caller really needs the result. For long jobs, return the job id and let the client poll your app. File: app/api/reports/preview/route.ts ```ts import { reportQueue } from '@/jobs/reports' export async function POST(req: Request) { const data = await req.json() const job = await reportQueue.add('preview-report', data, { attempts: 2, removeOnComplete: { count: 100, age: 3600 }, }) const result = await job.waitUntilFinished({ timeoutMs: 15_000, pollIntervalMs: 500, }) return Response.json(result) } ``` Note: `waitUntilFinished()` polls over HTTP. If you use immediate removal, wait before the short result snapshot expires or you may see `JobRemovedError`. ### Run code-free scheduled POSTs If the work is just "POST this JSON to that URL on a schedule", use Custom Triggers from the dashboard instead of writing SDK code. ## Coming from BullMQ Route: /docs/from-bullmq NextMQ runs real BullMQ under the hood, so the code you already know just works — same Queue, Worker, and FlowProducer. The few differences all come from one upgrade: we run, scale, and operate the queue for you. ### Import diff ```diff - import { Queue, Worker, FlowProducer } from 'bullmq' + import { Queue, Worker, FlowProducer } from '@nextmq/sdk' + import { createNextMQHandler } from '@nextmq/sdk/next' ``` ### Migration checklist 1. Swap BullMQ imports for `@nextmq/sdk`. 2. Add the `createNextMQHandler` route and import every worker into it. 3. Set `NEXTMQ_CONNECTION_STRING` and `NEXT_PUBLIC_APP_URL`. 4. Remove your Redis connection and always-on worker bootstrap. 5. Replace native `QueueEvents` waits with `job.waitUntilFinished()` or app polling. 6. Move repeatable setup into startup registration. ### Works exactly the same Import these from `@nextmq/sdk` and use them like you always have: - `new Queue()`, `queue.add()`, `queue.addBulk()` - `new Worker(name, processor, opts)` with `concurrency` and `limiter` - `attempts`, `backoff`, `delay`, `priority`, `lifo`, `jobId` - Job Schedulers — `upsertJobScheduler` and friends - `FlowProducer` graphs with parent/child failure flags - `deduplication`, global concurrency and rate limits - Inspection & control — `getJob`, `getJobCounts`, `retry`, `promote`, `changeDelay`, `changePriority`, `updateProgress`, `log`, `remove` ### Works a little differently | In BullMQ | In NextMQ | | --- | --- | | The `Worker` runs the job loop in your process | We run the loop and call your processor over a signed webhook. You declare the same processor function. | | Outcome via `moveToCompleted` / `moveToFailed` | Prefer `return value` / `throw error`. The `moveTo*` methods also work as terminal signals — see below. | | Await results via `QueueEvents` pub/sub | `job.waitUntilFinished()` awaits completion for you — no live connection to keep open. | | Retention defaults to keep-forever | Sensible retention is applied automatically so storage stays lean — tune it per job with `removeOnComplete` / `removeOnFail`. | ```ts // BullMQ: the worker runs in-process. // NextMQ: the same processor, declared in your app and run on demand. export const emailWorker = new Worker('emails', async (job) => { await send(job.data) return { ok: true } // return = completed, throw = failed (with retries) }) ``` ### Scaling: one worker per queue In BullMQ you scale a queue by running more `Worker` processes that compete for its jobs. NextMQ runs one worker per queue on our side, so scaling is simpler: set the worker's `concurrency`, and your serverless platform spins up as many function instances as it needs to handle the concurrent webhook calls. There's no worker fleet to run or autoscale. A queue is competing-consumers, exactly like BullMQ: each job runs **once**, it isn't broadcast to every worker. To make one event fan out to several independent handlers, give each handler its own queue and produce to all of them, or model the dependency with a flow. ### moveTo* methods For BullMQ source compatibility, `moveToCompleted`, `moveToFailed`, and `moveToDelayed` are supported inside processors as **terminal outcome signals**. After you call one, processor execution stops — you don't need to return, throw, pass a lock token, or throw BullMQ's special errors. `return` / `throw` stays the recommended style; these are here so existing BullMQ code moves over unchanged. ```ts new Worker('emails', async (job) => { if (job.data.skip) { await job.moveToCompleted({ skipped: true }) } // unreachable when skipped await doWork() return { done: true } }) ``` | Method | Terminal outcome | | --- | --- | | moveToCompleted(value) | Completes the job with `value` — same as `return value`. | | moveToFailed(error) | Fails the job — same as `throw error`. `UnrecoverableJobError` skips remaining retries. | | moveToDelayed(timestamp) | Re-runs the job at the given absolute ms timestamp. | `moveToWaitingChildren` waits on children created while a parent runs. NextMQ declares children up front with `FlowProducer` and runs the parent only after they finish, so there's nothing to wait on — model parent/child graphs with `FlowProducer` and you get the same result with less wiring. Heads up: These throw an internal control-flow signal to stop the processor, so a broad `try/catch` can accidentally swallow them. Re-throw with `isNextMQControlFlowError`: ```ts import { isNextMQControlFlowError } from '@nextmq/sdk' try { await job.moveToCompleted({ ok: true }) } catch (err) { if (isNextMQControlFlowError(err)) throw err // handle real errors here } ``` ### What NextMQ handles for you NextMQ owns the worker loop, locks, threads, and connections — so the low-level BullMQ APIs that exist to manage those yourself simply aren't part of the SDK. There's nothing to wire up, and a managed path covers each case: - Lock and token plumbing — `extendLock`, `getNextJob`, `processJob` — is managed for you. - Parent/child graphs use `FlowProducer` instead of `moveToWaitingChildren`. - No worker threads or sandboxed processor files to configure (`useWorkerThreads`). - Completion is a quick poll with `waitUntilFinished()` in place of in-process `cancelJob` / `QueueEvents`. - Payloads are plain JSON — easy to log, replay, and inspect in the dashboard. New here instead? Start with the Quickstart. ## Configuration Route: /docs/configuration The SDK is configured entirely by environment variables. There's no setup call — it reads them on first use, in every server runtime, for both producers and the route handler. ### The two you need File: .env ```bash NEXTMQ_CONNECTION_STRING=nextmq://v1.... NEXT_PUBLIC_APP_URL=https://your-app.com ``` | Variable | Required | Description | | --- | --- | --- | | NEXTMQ_CONNECTION_STRING | Yes | One value from your dashboard. Bundles the server URL, your API key, and the webhook secret. Server-only — never expose it to the browser. | | NEXT_PUBLIC_APP_URL | Yes | Your app's public URL — where NextMQ calls back to run your jobs. | Heads up: **One key, one secret — and both are sensitive.** The connection string bundles a single full-scope project key (enqueue, inspect, register workers, retry, remove, pause, obliterate) and a distinct webhook secret for signing callbacks. Keep it server-only — never put it in browser code or a `NEXT_PUBLIC_*` variable. ### Manual mode Instead of a connection string you can set the three transport values directly — handy for local development against a throwaway project. If `NEXTMQ_CONNECTION_STRING` is set, it wins over the manual transport variables. ```bash NEXTMQ_SERVER_URL=http://localhost:3001 NEXTMQ_API_KEY=nextmq_dev_key NEXTMQ_WEBHOOK_SECRET=dev_secret_1234567890 NEXT_PUBLIC_APP_URL=http://localhost:3000 ``` ### Optional | Variable | Default | Description | | --- | --- | --- | | NEXTMQ_WEBHOOK_BASE_URL | — | Override the callback origin when it differs from NEXT_PUBLIC_APP_URL (e.g. a tunnel in local dev, or pinning previews to one URL). | | NEXTMQ_REQUEST_TIMEOUT_MS | 15000 | Per-request timeout for SDK calls to NextMQ. Max 300000. | | NEXTMQ_WEBHOOK_BASE_PATH | /api/nextmq | Path your webhook route is mounted at. Must match where the route file lives. | | NEXTMQ_WEBHOOK_BODY_LIMIT_BYTES | 1048576 | Max accepted webhook payload size before the route responds 413. Max 10485760. | ### How it resolves Configuration is read **lazily on first use** and cached per server runtime, so there's no module to import and no ordering to worry about — a producer in a server action and the webhook handler each pick it up from the environment independently. Tip: **Dynamic config?** If you fetch secrets from a vault at boot, just set `process.env.NEXTMQ_CONNECTION_STRING` before the first queue call (e.g. in Next.js `instrumentation.ts`). It's still env — there's no separate configure API. ## SDK reference Route: /docs/sdk Every public class, method, option, error, and limit in one place. The guides teach the common paths with real code; this page is the exhaustive source of truth for the technical detail. ### Imports ```ts import { FlowProducer, JobRemovedError, NextMQHttpError, NextMQRequestTimeoutError, Queue, RateLimitError, UnrecoverableJobError, Worker, isNextMQControlFlowError, resetNextMQConfig, type Job, type Processor, type RemoteJobOptions, type RemoteWorkerOptions, } from '@nextmq/sdk' import { WorkerRegistrationError, createNextMQHandler, ensureWorkersRegistered, getWorkerRegistrationStatuses, } from '@nextmq/sdk/next' ``` ### Classes at a glance | Class | What it is | Guide | | --- | --- | --- | | Queue | Produce jobs and inspect/control a queue. Creating one is local; methods are HTTP calls. | Queues | | Worker | Declares the processor that runs a queue's jobs, plus its execution options. | Workers | | Job | A single job — read its state, mutate it, or signal an outcome from a processor. | Jobs | | FlowProducer | Create and inspect parent/child job graphs. | Flows | | createNextMQHandler | Builds the signed webhook route that runs your workers. | Quickstart | Note: `Queue` and `FlowProducer` take no Redis connection — the SDK reads its config from the environment and talks to NextMQ over HTTP. Every method below is an `async` HTTP call unless noted. See Configuration. ### Queue ```ts const queue = new Queue(name) ``` #### Produce | Method | Description | | --- | --- | | add(name, data, opts?) | Enqueue one job. opts is the job-options set below. Returns the created Job. | | addBulk(jobs) | Enqueue many jobs in one call ({ name, data, opts? }[]). Up to 1,000 per call. | #### Read | Method | Description | | --- | --- | | getJob(jobId) | Fetch one job by id, or undefined if it no longer exists. | | getJobs(states, start?, end?) | Fetch jobs in the given states (paged; default 0–99, max 1,000 per request). | | `getWaiting` / `getActive` / `getCompleted` / `getFailed` / `getDelayed` / `getPrioritized` / `getWaitingChildren` (start?, end?) | Paged fetch of jobs in a single state. | | `count()` / `getJobCounts(...states?)` | Total job count, or counts per state. | | `getWaitingCount` / `getActiveCount` / … (per-state counters) | Count for a single state. | | getJobLogs(jobId, start?, end?) | Read a job's log lines (paged, retained up to keepLogs). | | getSummary() | Counts and paused state in one call — handy for a dashboard row. | #### Control & maintenance | Method | Description | | --- | --- | | pause() | Stop the queue from starting new jobs; in-flight jobs finish. Use during an incident or migration. | | resume() | Resume a paused queue. | | isPaused() | Whether the queue is currently paused. | | drain(delayed?) | Remove waiting jobs (and delayed when delayed=true), keeping active ones — clear a backlog without deleting the queue. | | clean(grace, limit, state) | Bulk-remove jobs in a state older than grace ms (e.g. old completed/failed). limit caps how many per call. | | retryJobs(opts?) | Move failed (or completed) jobs back to waiting — recover after a downstream outage. | | promoteJobs(opts?) | Move delayed jobs to waiting now. | | obliterate(opts?) | Delete the queue and all its data. Destructive; pass { force: true } to skip the active-jobs guard. | Heads up: Control methods can be destructive and run under your full-scope key. Keep `NEXTMQ_CONNECTION_STRING` server-only. #### Schedulers | Method | Description | | --- | --- | | upsertJobScheduler(id, repeat, template?) | Create or update a scheduler by id (idempotent). repeat = the scheduler options below; template sets the produced job's name/data/opts. | | getJobSchedulers(start?, end?) | List the queue's schedulers (paged). | | getJobScheduler(id) | Fetch one scheduler, or undefined. | | removeJobScheduler(id) | Delete a scheduler; returns whether one was removed. | #### Queue-wide throughput Runtime, queue-level throttles you can change without redeploying — see Workers for how they relate to a worker's own `concurrency` and `limiter`. | Method | Description | | --- | --- | | setGlobalConcurrency(n) | Cap how many of the queue's jobs run at once, across the board. | | getGlobalConcurrency() | Current global concurrency, or null if unset. | | removeGlobalConcurrency() | Clear the global concurrency cap. | | setGlobalRateLimit(max, duration) | Cap how many jobs start per duration ms window. | | getGlobalRateLimit() | Current global rate limit { max, duration }, or null. | | removeGlobalRateLimit() | Clear the global rate limit. | #### Deduplication | Method | Description | | --- | --- | | getDeduplicationJobId(dedupId) | The job id currently holding a deduplication key, or null. | | removeDeduplicationKey(dedupId) | Release a deduplication key early so the next add is no longer collapsed. | ### Worker ```ts const worker = new Worker(queueName, processor, options?) ``` | Member | Description | | --- | --- | | processor(job) | Your function. Return a value to complete the job; throw to fail it (retries apply). | | run() | Start processing. The webhook handler calls this for you; rarely needed directly. | | close() | Local only — stops this declaration. It does not unregister the durable server-side worker. | | autorun | Whether the handler registers this worker by default (set { autorun: false } to opt out). | | options | The resolved RemoteWorkerOptions (below). | Note: Outcome signals — `moveToCompleted`, `moveToFailed`, `moveToDelayed` — and the errors `UnrecoverableJobError` / `RateLimitError` are how a processor reports results. See Workers. ### Job #### Properties | Property | Description | | --- | --- | | id | Job id. Use it as the idempotency key for side effects. | | name | Job name passed to add(). | | data | The job payload (typed as TData). | | opts | The applied job options, including clamped retention — what was actually stored. | | progress | Last value reported by updateProgress(). | | returnvalue | The processor's return value once completed. | | failedReason | Error message of the last failed attempt. | | stacktrace | Captured stack traces for failed attempts. | | attemptsMade | How many attempts have run. | | `timestamp` / `processedOn` / `finishedOn` | Created, first-processed, and finished epoch ms. | | `parent` / `parentKey` | For a flow child, a reference to its parent. | | deliveryId | Identifies one delivery attempt — finer-grained than id for per-attempt logging/dedup. | #### State | Method | Description | | --- | --- | | getState() | Current state: 'waiting' \| 'active' \| 'completed' \| 'failed' \| 'delayed' \| … | | refresh() | Re-fetch the latest job data from the server, mutating this instance. | | `isCompleted` / `isFailed` / `isActive` / `isWaiting` / `isDelayed` / `isWaitingChildren` () | Convenience booleans over the last known state. | #### Mutate & act | Method | Description | | --- | --- | | updateProgress(value) | Report progress (a number or an object). Call from inside a processor. | | log(line) | Append a log line, retained up to keepLogs. Returns the new line count. | | updateData(data) | Replace the job's data payload. | | retry(state?) | Move a failed (or completed) job back to waiting to run again. | | promote() | Move a delayed job to waiting now. | | changeDelay(ms) | Reschedule a delayed job. | | changePriority({ priority?, lifo? }) | Change a waiting job's priority or LIFO placement. | | remove({ removeChildren? }) | Delete the job; pass removeChildren for flow trees. | #### Outcome signals (inside a processor) | Method | Description | | --- | --- | | moveToCompleted(value) | Complete the job — same as returning value. Execution stops after the call. | | moveToFailed(error) | Fail the job — same as throwing. UnrecoverableJobError skips remaining attempts. | | moveToDelayed(timestamp) | Re-run the job at an absolute epoch-ms time. | | moveToWaitingChildren() | Not supported — model graphs with FlowProducer. Throws a clear error. | #### Flows & waiting | Method | Description | | --- | --- | | getChildrenValues() | In a flow parent, the children's return values keyed by ':'. | | waitUntilFinished({ timeoutMs?, pollIntervalMs? }) | Poll until the job completes or fails. The serverless-safe replacement for QueueEvents. | ### FlowProducer | Method | Description | | --- | --- | | add(flow) | Create one parent/child graph. Children run first; the parent runs after they finish. | | addBulk(flows) | Create multiple independent flows in one call (up to 100). | | getFlow({ id, queueName, depth?, maxChildren? }) | Fetch a flow tree for inspection. | ### @nextmq/sdk/next | Export | Description | | --- | --- | | createNextMQHandler({ workers }) | Returns { GET, POST } for the catch-all webhook route. Verifies signatures, dispatches jobs, serves /health. Node runtime only. | | ensureWorkersRegistered(workers, opts?) | Register workers' callback URLs up front (e.g. from instrumentation.ts). Idempotent. { includeNonAutorun: true } registers autorun:false workers too. | | getWorkerRegistrationStatuses(workers) | Per-worker registration status — useful in a health check or smoke test. | ### Job options (add) The third argument to `add()`. Unsupported options are rejected, never silently dropped. | Option | Type / bound | Description | | --- | --- | --- | | delay | ms ≥ 0 | Wait this long before the job becomes eligible. Cannot be combined with repeat. | | attempts | int ≥ 0 | Total tries before the job is marked failed. | | backoff | { type, delay } | Retry spacing: type 'fixed' or 'exponential', delay in ms. | | priority | 0 – 2,097,152 | Lower runs first; 0 means unprioritized. | | jobId | string | Custom id. Cannot contain ':' or be an integer string. One canonical job per business object. | | lifo | boolean | Push to the front of the queue instead of the back. | | keepLogs | int ≥ 0 | Max log lines to retain (clamped to the project cap). | | sizeLimit | bytes ≥ 1 | Reject the job if its serialized data exceeds this. | | removeOnComplete | bool \| n \| { age, count? } | Retention for completed jobs. Clamped to project caps. | | removeOnFail | bool \| n \| { age, count? } | Retention for failed jobs. Clamped to project caps. | | deduplication | { id, … } | Collapse duplicate work within a window — see below. | | repeat | { pattern \| every, … } | Inline repeatable config; prefer Job Schedulers. | #### Flow child options | Option | Description | | --- | --- | | failParentOnFailure | If this child fails, fail the parent immediately. | | continueParentOnFailure | Let the parent proceed even if this child fails. | | ignoreDependencyOnFailure | Don't block the parent on this child's result if it fails. | | removeDependencyOnFailure | Drop this child from the parent's dependency set on failure. | ### Worker options | Option | Bound / default | Description | | --- | --- | --- | | concurrency | 1 – 100; default 1 | Max jobs this worker runs in parallel. | | limiter | { max ≥ 1, duration ≥ 100 } | Rate limit: at most max jobs per duration ms. | | timeoutMs | 1,000 – 300,000; default 30,000 | How long NextMQ waits for your webhook response. Keep below your platform's function timeout. | | autorun | default true | Whether the handler registers this worker by default. | | lockDuration | 1,000 – 300,000 | How long a job's lock is held while processing. Advanced. | | lockRenewTime | 500 – 300,000 | Lock renew interval; must be < lockDuration. Advanced. | | stalledInterval | 1,000 – 300,000 | How often stalled jobs are checked. Advanced. | | maxStalledCount | 0 – 100 | Times a job may stall before failing. Advanced. | | drainDelay | 1 – 300,000 | Delay used while the queue drains. Advanced. | ### Scheduler options The repeat argument to `upsertJobScheduler`. Provide exactly one of `pattern` or `every`. | Option | Description | | --- | --- | | pattern | Cron expression (e.g. '0 9 * * *'). | | every | Fixed interval in ms (alternative to pattern). | | limit | Stop after this many runs. | | tz | Timezone for cron evaluation (follows DST). | | `startDate` / `endDate` | Don't fire before / after these times. | | immediately | Produce the first job now instead of after one interval (not with startDate). | ### Deduplication options | Option | Description | | --- | --- | | id | Deduplication key. A second add with the same id is collapsed. | | ttl | Window in ms during which duplicates collapse. | | extend | Refresh the ttl on each duplicate add. | | replace | Replace the pending job's data with the newer add. | | keepLastIfActive | Keep deduplicating against a job that's currently active. | ### Errors & control flow | Export | Use | | --- | --- | | UnrecoverableJobError | Throw in a processor to fail permanently without using remaining attempts. | | RateLimitError | Throw with a retry-after delay to pause/requeue on a provider rate limit, without burning an attempt. | | JobRemovedError | Thrown by waitUntilFinished when the job is gone and no result snapshot remains. | | NextMQHttpError | A non-2xx response to an SDK request — inspect .status and body. | | NextMQRequestTimeoutError | An SDK request exceeded NEXTMQ_REQUEST_TIMEOUT_MS. | | WorkerRegistrationError | ensureWorkersRegistered could not register one or more workers. | | isNextMQControlFlowError(err) | Re-throw internal moveTo* signals from a broad try/catch so they aren't swallowed. | | resetNextMQConfig() | Clear cached env-derived config — mainly for tests that change process.env between cases. | ### Limits The fixed technical limits the SDK and server enforce. Plan quotas (queues, jobs, throughput included) are separate — see pricing. #### Identifiers and jobs | Limit | Value | | --- | --- | | Queue name | 1-128 characters; letters, numbers, _, -, and . only. | | jobId | Cannot contain ':' and cannot be an integer string. | | priority | 0 to 2,097,152. | | addBulk() | At most 1,000 jobs per call. | | Job list, log, and scheduler pages | At most 1,000 items per request. | | Child values page | At most 1,000 processed child values per request. | #### Flows | Limit | Value | | --- | --- | | Flow depth | At most 20 levels. | | Children per flow node | At most 1,000. | | FlowProducer.addBulk() | At most 100 independent flows per call. | | Flow child deduplication | Not supported on child jobs. | #### Workers and webhooks | Limit | Value | | --- | --- | | concurrency | 1 to 100; default 1. | | timeoutMs | 1,000 to 300,000 ms; default 30,000 ms. | | limiter.duration | Minimum 100 ms. | | NEXTMQ_WEBHOOK_BODY_LIMIT_BYTES | Default 1,048,576; max 10,485,760. | | NEXTMQ_REQUEST_TIMEOUT_MS | Default 15,000; max 300,000. | #### Retention defaults | Setting | Default | | --- | --- | | removeOnComplete | Completed jobs are capped at 1,000 or 1 day. | | removeOnFail | Failed jobs are capped at 5,000 or 7 days. | | keepLogs | 100 log lines. | Explicit retention values are clamped to the project caps. Immediate removal still keeps a short result snapshot for polling callers; see Reliability & delivery guarantees. ## Testing jobs Route: /docs/testing Treat processors like ordinary async functions first, then add one small integration check for registration and webhook reachability. ### Unit-test processor code Put the business logic in a plain function, then call it from the worker. Most tests should hit that function directly without NextMQ, HTTP, or Redis. File: jobs/reports.ts ```ts import { Queue, Worker, type Job } from '@nextmq/sdk' type ReportJob = { reportId: string } export const reportQueue = new Queue('reports') export async function buildReport(data: ReportJob) { if (!data.reportId) throw new Error('reportId is required') return { url: await renderReport(data.reportId) } } export async function processReport(job: Job) { await job.updateProgress(25) const result = await buildReport(job.data) await job.updateProgress(100) return result } export const reportWorker = new Worker('reports', processReport) ``` File: jobs/reports.test.ts ```ts import { expect, it, vi } from 'vitest' import { buildReport, processReport } from './reports' it('validates inputs without the queue', async () => { await expect(buildReport({ reportId: '' })).rejects.toThrow('reportId') }) it('reports progress from the processor', async () => { const updateProgress = vi.fn() await processReport({ id: 'job-1', data: { reportId: 'r_123' }, updateProgress, } as any) expect(updateProgress).toHaveBeenCalledWith(25) expect(updateProgress).toHaveBeenCalledWith(100) }) ``` ### Test idempotency Reliability bugs usually come from side effects that run twice. Write a test that calls the processor twice with the same `job.id`. ```ts it('does not charge twice for the same job id', async () => { const job = { id: 'job-42', data: { orderId: 'ord_42' } } as any await payoutProcessor(job) await payoutProcessor(job) expect(bank.transfer).toHaveBeenCalledTimes(1) }) ``` ### Check registration in CI or smoke tests A deployed app should answer the handler health path with registered workers. That verifies environment variables, worker construction, and the callback URL in one request. ```bash curl -fsS https://your-app.com/api/nextmq/health ``` Heads up: A 200 health response means the workers registered. It does not prove every downstream service your processor calls is healthy. ### Local end-to-end NextMQ calls your app over the public internet. For local E2E, expose the local Next.js server through a tunnel and set `NEXTMQ_WEBHOOK_BASE_URL` to that origin — see Local development for ngrok and Cloudflare Tunnel setup. ```bash NEXTMQ_CONNECTION_STRING=nextmq://v1.... NEXT_PUBLIC_APP_URL=http://localhost:3000 NEXTMQ_WEBHOOK_BASE_URL=https://your-tunnel.ngrok.app ``` ### What to mock | Thing | Recommended test | | --- | --- | | Processor business logic | Call your function directly with normal objects. | | Job methods | Mock only the methods you call: updateProgress, log, getChildrenValues. | | Queue producer code | Assert add/addBulk was called with the expected name, data, and opts. | | Webhook route | Smoke test /api/nextmq/health in a deployed environment. | | Full queue lifecycle | Use a tunnel and a real project for one E2E path. | ### Useful assertions - Retries do not duplicate irreversible side effects. - Permanent validation errors throw `UnrecoverableJobError`. - Provider 429s throw `RateLimitError` with the expected delay. - Startup registration does not throw in Node runtime. - The handler route is not deployed to the Edge runtime. For production failure modes, see Reliability & delivery guarantees.