diff --git a/.gitignore b/.gitignore index 3df4c92..9153b8c 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,9 @@ coverage npm-debug.log yarn-error.log +# OS specific +.DS_Store + # Editors specific .fleet .idea diff --git a/README.md b/README.md index aafc6cf..a3a0969 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ npm install @boringnode/queue - **Priority Queues**: Process high-priority jobs first - **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once - **Job Grouping**: Organize related jobs for monitoring +- **Job Deduplication**: Prevent duplicate jobs with custom IDs - **Retry with Backoff**: Exponential, linear, or fixed backoff strategies - **Job Timeout**: Fail or retry jobs that exceed a time limit - **Job History**: Retain completed/failed jobs for debugging @@ -131,6 +132,38 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025') The `groupId` is stored with job data and accessible via `job.data.groupId`. +## Job Deduplication + +Prevent the same job from being pushed to the queue twice using `.dedup()`: + +```typescript +// First dispatch - job is created +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() + +// Second dispatch with same dedup ID - silently skipped +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() +``` + +The dedup ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts: + +```typescript +// These are two different jobs, no conflict +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() +await SendReceiptJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() +``` + +Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks: + +- **Redis**: Uses `HSETNX` (set-if-not-exists) +- **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING` +- **SyncAdapter**: Executes jobs inline and does not support deduplication + +> [!NOTE] +> Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. The `.dedup()` method is only available on single dispatch, not `dispatchMany`. + +> [!TIP] +> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same dedup ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally. + ## Job History & Retention Keep completed and failed jobs for debugging: diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index ddc528a..e5f830c 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -160,6 +160,11 @@ export class FakeAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { + if (jobData.dedup) { + const existing = await this.getJob(jobData.id, queue) + if (existing) return + } + this.#recordPush(queue, jobData) this.#enqueue(queue, jobData) } @@ -168,7 +173,12 @@ export class FakeAdapter implements Adapter { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + if (jobData.dedup) { + const existing = await this.getJob(jobData.id, queue) + if (existing) return + } + this.#recordPush(queue, jobData, delay) this.#schedulePush(queue, jobData, delay) diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index 56bcd34..5e688af 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -370,13 +370,19 @@ export class KnexAdapter implements Adapter { const timestamp = Date.now() const score = calculateScore(priority, timestamp) - await this.#connection(this.#jobsTable).insert({ + const query = this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, status: 'pending', data: JSON.stringify(jobData), score, }) + + if (jobData.dedup) { + await query.onConflict(['id', 'queue']).ignore() + } else { + await query + } } async pushLater(jobData: JobData, delay: number): Promise { @@ -386,13 +392,19 @@ export class KnexAdapter implements Adapter { async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const executeAt = Date.now() + delay - await this.#connection(this.#jobsTable).insert({ + const query = this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, status: 'delayed', data: JSON.stringify(jobData), execute_at: executeAt, }) + + if (jobData.dedup) { + await query.onConflict(['id', 'queue']).ignore() + } else { + await query + } } async pushMany(jobs: JobData[]): Promise { diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index bb66c05..054ac9e 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -35,6 +35,26 @@ const PUSH_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a dedup job. + * Uses HSETNX to only store data if the job doesn't already exist. + * Only adds to pending ZSET if the job was newly created. + */ +const PUSH_DEDUP_JOB_SCRIPT = ` + local data_key = KEYS[1] + local pending_key = KEYS[2] + local job_id = ARGV[1] + local job_data = ARGV[2] + local score = tonumber(ARGV[3]) + + local added = redis.call('HSETNX', data_key, job_id, job_data) + if added == 1 then + redis.call('ZADD', pending_key, score, job_id) + end + + return added +` + /** * Lua script for pushing a delayed job. * Stores job data in the central hash and adds jobId to delayed ZSET. @@ -52,6 +72,26 @@ const PUSH_DELAYED_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a dedup delayed job. + * Uses HSETNX to only store data if the job doesn't already exist. + * Only adds to delayed ZSET if the job was newly created. + */ +const PUSH_DEDUP_DELAYED_JOB_SCRIPT = ` + local data_key = KEYS[1] + local delayed_key = KEYS[2] + local job_id = ARGV[1] + local job_data = ARGV[2] + local execute_at = tonumber(ARGV[3]) + + local added = redis.call('HSETNX', data_key, job_id, job_data) + if added == 1 then + redis.call('ZADD', delayed_key, execute_at, job_id) + end + + return added +` + /** * Lua script for atomic job acquisition. * 1. Check and process delayed jobs @@ -620,8 +660,10 @@ export class RedisAdapter implements Adapter { const keys = this.#getKeys(queue) const executeAt = Date.now() + delay + const script = jobData.dedup ? PUSH_DEDUP_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT + await this.#connection.eval( - PUSH_DELAYED_JOB_SCRIPT, + script, 2, keys.data, keys.delayed, @@ -637,8 +679,10 @@ export class RedisAdapter implements Adapter { const timestamp = Date.now() const score = calculateScore(priority, timestamp) + const script = jobData.dedup ? PUSH_DEDUP_JOB_SCRIPT : PUSH_JOB_SCRIPT + await this.#connection.eval( - PUSH_JOB_SCRIPT, + script, 2, keys.data, keys.pending, diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index 9198895..6a104d6 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -15,11 +15,12 @@ import { parse } from './utils.js' * * ``` * Job.dispatch(payload) - * .toQueue('emails') // optional: target queue - * .priority(1) // optional: 1-10, lower = higher priority - * .in('5m') // optional: delay before processing - * .with('redis') // optional: specific adapter - * .run() // dispatch the job + * .toQueue('emails') // optional: target queue + * .priority(1) // optional: 1-10, lower = higher priority + * .in('5m') // optional: delay before processing + * .dedup({ id: 'order-123' }) // optional: deduplication + * .with('redis') // optional: specific adapter + * .run() // dispatch the job * ``` * * @typeParam T - The payload type for this job @@ -47,6 +48,7 @@ export class JobDispatcher { #delay?: Duration #priority?: number #groupId?: string + #dedup?: { id: string } /** * Create a new job dispatcher. @@ -148,6 +150,41 @@ export class JobDispatcher { return this } + /** + * Configure deduplication for this job. + * + * When deduplication is configured, the adapter will silently skip + * the job if one with the same dedup ID already exists in the queue. + * The ID is automatically prefixed with the job name to prevent + * collisions between different job types. + * + * @param options - Deduplication options + * @param options.id - Unique deduplication key + * @returns This dispatcher for chaining + * + * @example + * ```typescript + * // Prevent duplicate invoice jobs for the same order + * await SendInvoiceJob.dispatch({ orderId: 123 }) + * .dedup({ id: 'order-123' }) + * .run() + * + * // Second dispatch with same dedup ID is silently skipped + * await SendInvoiceJob.dispatch({ orderId: 123 }) + * .dedup({ id: 'order-123' }) + * .run() + * ``` + */ + dedup(options: { id: string }): this { + if (!options.id) { + throw new Error('Dedup ID must be a non-empty string') + } + + this.#dedup = options + + return this + } + /** * Use a specific adapter for this job. * @@ -181,7 +218,7 @@ export class JobDispatcher { * ``` */ async run(): Promise { - const id = randomUUID() + const id = this.#dedup ? `${this.#name}::${this.#dedup.id}` : randomUUID() debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) @@ -197,6 +234,7 @@ export class JobDispatcher { priority: this.#priority, groupId: this.#groupId, createdAt: Date.now(), + ...(this.#dedup ? { dedup: { id: this.#dedup.id } } : {}), } const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } diff --git a/src/types/main.ts b/src/types/main.ts index 4bc68c9..599086b 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -133,6 +133,17 @@ export interface JobData { * Injected by OTel plugin at dispatch time. */ traceContext?: Record + + /** + * Deduplication configuration for this job. + * When set, adapters use atomic insert-if-not-exists semantics + * to silently skip duplicate jobs with the same ID. + * Set automatically when `.dedup()` is called on the dispatcher. + */ + dedup?: { + /** The original dedup key provided by the caller (before name-prefixing). */ + id: string + } } /** diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index 08fbc12..a944610 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -51,6 +51,11 @@ export class MemoryAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { + if (jobData.dedup) { + const existing = await this.getJob(jobData.id, queue) + if (existing) return + } + if (!this.#queues.has(queue)) { this.#queues.set(queue, []) } @@ -62,7 +67,12 @@ export class MemoryAdapter implements Adapter { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + if (jobData.dedup) { + const existing = await this.getJob(jobData.id, queue) + if (existing) return + } + if (!this.#delayedJobs.has(queue)) { this.#delayedJobs.set(queue, new Map()) } diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index ecf5727..ab80635 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -613,7 +613,9 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.isNull(job3) }) - test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ assert }) => { + test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ + assert, + }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1647,4 +1649,112 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(second!.id, 'medium') assert.equal(third!.id, 'low') }) + + test('pushOn with dedup should skip duplicate job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 1) + + const job = await adapter.popFrom('test-queue') + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('pushOn without dedup should insert normally', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'job-1', + name: 'TestJob', + payload: { data: 'first' }, + attempts: 0, + }) + + await adapter.pushOn('test-queue', { + id: 'job-2', + name: 'TestJob', + payload: { data: 'second' }, + attempts: 0, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 2) + }) + + test('pushLaterOn with dedup should skip duplicate delayed job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 60_000 + ) + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 60_000 + ) + + const job = await adapter.getJob('TestJob::delayed-1', 'test-queue') + assert.isNotNull(job) + assert.deepEqual(job!.data.payload, { attempt: 1 }) + }) + + test('pushOn with dedup should allow same id on different queues', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('queue-a', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'a' }, + attempts: 0, + dedup: { id: 'shared-id' }, + }) + + await adapter.pushOn('queue-b', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'b' }, + attempts: 0, + dedup: { id: 'shared-id' }, + }) + + const sizeA = await adapter.sizeOf('queue-a') + const sizeB = await adapter.sizeOf('queue-b') + assert.equal(sizeA, 1) + assert.equal(sizeB, 1) + }) } diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index a4eae64..94924b6 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -91,6 +91,64 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) + test('should skip duplicate pushOn when dedup is set', async ({ assert }) => { + const adapter = fake()() + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + const size = await adapter.size() + assert.equal(size, 1) + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + + test('should skip duplicate pushLaterOn when dedup is set', async () => { + const adapter = fake()() + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 5000 + ) + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 5000 + ) + + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + test('should support job class matchers', async ({ assert }) => { const adapter = fake()() @@ -128,5 +186,4 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - }) diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 7c71311..214b4e9 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -317,6 +317,119 @@ test.group('JobDispatcher | groupId', () => { }) }) +test.group('JobDispatcher | dedup', () => { + test('should throw error when dedup id is empty', async ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', { data: 'test' }).dedup({ id: '' }), + 'Dedup ID must be a non-empty string' + ) + }) + + test('should use dedup id prefixed with job name', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const { jobId } = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) + .dedup({ id: 'order-123' }) + .run() + + assert.equal(jobId, 'SendInvoiceJob::order-123') + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.equal(job!.id, 'SendInvoiceJob::order-123') + }) + + test('should set dedup field on job data when dedup is configured', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('UniqueJob', { data: 'test' }).dedup({ id: 'my-id' }).run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.deepEqual(job!.dedup, { id: 'my-id' }) + }) + + test('should not set dedup field when dedup is not configured', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('RegularJob', { data: 'test' }).run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.isUndefined(job!.dedup) + }) + + test('should silently skip duplicate job with same dedup id', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('DedupJob', { attempt: 1 }).dedup({ id: 'dedup-1' }).run() + await new JobDispatcher('DedupJob', { attempt: 2 }).dedup({ id: 'dedup-1' }).run() + + const size = await sharedAdapter.size() + assert.equal(size, 1) + + const job = await sharedAdapter.pop() + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('should allow same dedup id for different job names', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('JobA', { type: 'a' }).dedup({ id: 'same-id' }).run() + await new JobDispatcher('JobB', { type: 'b' }).dedup({ id: 'same-id' }).run() + + const size = await sharedAdapter.size() + assert.equal(size, 2) + }) + + test('should work with other options like priority and queue', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const { jobId } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) + .dedup({ id: 'task-1' }) + .toQueue('high') + .priority(1) + .run() + + assert.equal(jobId, 'PriorityDedupJob::task-1') + + const job = await sharedAdapter.popFrom('high') + assert.isNotNull(job) + assert.equal(job!.priority, 1) + assert.deepEqual(job!.dedup, { id: 'task-1' }) + }) +}) + test.group('JobBatchDispatcher', () => { test('should dispatch multiple jobs correctly', async ({ assert }) => { const sharedAdapter = memory()()