NextMQ

Queues & job options

A Queue is a named producer. Creating one is local; add() and addBulk() send your jobs to NextMQ, which validates the options and runs them.

Note
Keep queue names and custom job IDs colon-free. NextMQ uses BullMQ's native key layout for flows, where colons are structural separators.

Creating a queue#

import { Queue } from '@nextmq/sdk'

type NotifyJob = { userId: string; channel: 'push' | 'sms' }
const notifications = new Queue<NotifyJob>('notifications')

Adding jobs#

// Single job
await notifications.add('digest', { userId, channel: 'push' })

// With options
await notifications.add('digest', { userId, channel: 'push' }, {
  delay: 5000,
  attempts: 3,
  backoff: { type: 'exponential', delay: 2000 },
  priority: 1,
})

addBulk

await notifications.addBulk([
  { name: 'digest', data: { userId: 'u_1', channel: 'push' } },
  { name: 'digest', data: { userId: 'u_2', channel: 'sms' }, opts: { delay: 1000 } },
])

Supported job options#

Pass any of these in the third argument to add(). Options that can't survive a remote queue are rejected with a clear error, never silently dropped.

OptionDescription
delayMilliseconds to wait before the job becomes eligible to run.
attemptsTotal times to try the job before it's marked failed.
backoffRetry delay strategy: { type: 'fixed' | 'exponential', delay }.
priorityLower number runs first.
jobIdCustom job id (colon-free). Enables idempotent enqueues.
lifoPush to the front of the queue instead of the back.
keepLogsMax number of log lines to retain.
sizeLimitReject the job if its serialized data exceeds this many bytes.
removeOnCompleteRetention for completed jobs — see clamping below.
removeOnFailRetention for failed jobs — see clamping below.
repeatLimited repeatable config; prefer Job Schedulers.
deduplicationCollapse duplicate work — see below.

Retention is clamped#

Native BullMQ keeps completed and failed jobs forever by default. To keep your project's storage bounded, NextMQ clamps retention on every job:

  • Unset or false ("keep forever") becomes a { count, age } cap.
  • Explicit values are clamped to your project's caps.
  • keepLogs is capped the same way.

The applied values are visible in the job's serialized opts, so you can always see what was stored.

Deduplication#

Pass a deduplication id to collapse duplicate work within a TTL window — handy when a form can be double-submitted or a request gets retried.

await exports.add('csv', { reportId }, {
  deduplication: { id: `csv-${reportId}`, ttl: 60_000 },
})

Supported fields: id, ttl, extend, replace, and keepLastIfActive. The helpers getDeduplicationJobId and removeDeduplicationKey are available on the queue.

Queue-wide throughput controls#

These apply across every worker on the queue — distinct from per-worker concurrency and limiter. Useful for protecting a shared downstream like a third-party or AI API.

await ai.setGlobalConcurrency(50)
await ai.setGlobalRateLimit(100, 1000)  // max, duration (ms)

await ai.getGlobalConcurrency()         // number | null
await ai.getGlobalRateLimit()           // { max, duration } | null
await ai.removeGlobalRateLimit()
Tip
Need recurring jobs? Use Job Schedulers rather than ad-hoc repeat options.