From 993b481169eed2f9818f90ef6fbce42bd973b289 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Fri, 10 Apr 2026 17:38:01 +0530 Subject: [PATCH 1/4] feat: add .id() method for job deduplication --- README.md | 32 ++++++ src/drivers/fake_adapter.ts | 13 +++ src/drivers/knex_adapter.ts | 16 ++- src/drivers/redis_adapter.ts | 48 ++++++++- src/job_dispatcher.ts | 38 ++++++- src/types/main.ts | 7 ++ tests/_mocks/memory_adapter.ts | 13 +++ tests/_utils/register_driver_test_suite.ts | 112 +++++++++++++++++++- tests/fake_adapter.spec.ts | 59 ++++++++++- tests/job_dispatcher.spec.ts | 113 +++++++++++++++++++++ 10 files changed, 444 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index aafc6cf..2add2d0 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ npm install @boringnode/queue - **Priority Queues**: Process high-priority jobs first - **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once - **Job Grouping**: Organize related jobs for monitoring +- **Job Deduplication**: Prevent duplicate jobs with custom IDs - **Retry with Backoff**: Exponential, linear, or fixed backoff strategies - **Job Timeout**: Fail or retry jobs that exceed a time limit - **Job History**: Retain completed/failed jobs for debugging @@ -131,6 +132,37 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025') The `groupId` is stored with job data and accessible via `job.data.groupId`. +## Job Deduplication + +Prevent the same job from being pushed to the queue twice using custom job IDs: + +```typescript +// First dispatch - job is created +await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() + +// Second dispatch with same ID - silently skipped +await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() +``` + +The custom ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts: + +```typescript +// These are two different jobs, no conflict +await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() +await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run() +``` + +Deduplication is atomic and race-condition-free across all adapters: + +- **Redis**: Uses `HSETNX` (set-if-not-exists) +- **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING` + +> [!NOTE] +> Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`. + +> [!TIP] +> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same custom ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally. + ## Job History & Retention Keep completed and failed jobs for debugging: diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index ddc528a..39bfbb8 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -160,6 +160,11 @@ export class FakeAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { + if (jobData.unique) { + const jobs = this.#queues.get(queue) + if (jobs?.some((j) => j.id === jobData.id)) return + } + this.#recordPush(queue, jobData) this.#enqueue(queue, jobData) } @@ -169,6 +174,14 @@ export class FakeAdapter implements Adapter { } pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + if (jobData.unique) { + const jobs = this.#queues.get(queue) + if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve() + + const delayed = this.#delayedJobs.get(queue) + if (delayed?.has(jobData.id)) return Promise.resolve() + } + this.#recordPush(queue, jobData, delay) this.#schedulePush(queue, jobData, delay) diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index 56bcd34..a421a7c 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -370,13 +370,19 @@ export class KnexAdapter implements Adapter { const timestamp = Date.now() const score = calculateScore(priority, timestamp) - await this.#connection(this.#jobsTable).insert({ + const query = this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, status: 'pending', data: JSON.stringify(jobData), score, }) + + if (jobData.unique) { + await query.onConflict(['id', 'queue']).ignore() + } else { + await query + } } async pushLater(jobData: JobData, delay: number): Promise { @@ -386,13 +392,19 @@ export class KnexAdapter implements Adapter { async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const executeAt = Date.now() + delay - await this.#connection(this.#jobsTable).insert({ + const query = this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, status: 'delayed', data: JSON.stringify(jobData), execute_at: executeAt, }) + + if (jobData.unique) { + await query.onConflict(['id', 'queue']).ignore() + } else { + await query + } } async pushMany(jobs: JobData[]): Promise { diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index bb66c05..4e4bc4b 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -35,6 +35,26 @@ const PUSH_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a unique job. + * Uses HSETNX to only store data if the job doesn't already exist. + * Only adds to pending ZSET if the job was newly created. + */ +const PUSH_UNIQUE_JOB_SCRIPT = ` + local data_key = KEYS[1] + local pending_key = KEYS[2] + local job_id = ARGV[1] + local job_data = ARGV[2] + local score = tonumber(ARGV[3]) + + local added = redis.call('HSETNX', data_key, job_id, job_data) + if added == 1 then + redis.call('ZADD', pending_key, score, job_id) + end + + return added +` + /** * Lua script for pushing a delayed job. * Stores job data in the central hash and adds jobId to delayed ZSET. @@ -52,6 +72,26 @@ const PUSH_DELAYED_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a unique delayed job. + * Uses HSETNX to only store data if the job doesn't already exist. + * Only adds to delayed ZSET if the job was newly created. + */ +const PUSH_UNIQUE_DELAYED_JOB_SCRIPT = ` + local data_key = KEYS[1] + local delayed_key = KEYS[2] + local job_id = ARGV[1] + local job_data = ARGV[2] + local execute_at = tonumber(ARGV[3]) + + local added = redis.call('HSETNX', data_key, job_id, job_data) + if added == 1 then + redis.call('ZADD', delayed_key, execute_at, job_id) + end + + return added +` + /** * Lua script for atomic job acquisition. * 1. Check and process delayed jobs @@ -620,8 +660,10 @@ export class RedisAdapter implements Adapter { const keys = this.#getKeys(queue) const executeAt = Date.now() + delay + const script = jobData.unique ? PUSH_UNIQUE_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT + await this.#connection.eval( - PUSH_DELAYED_JOB_SCRIPT, + script, 2, keys.data, keys.delayed, @@ -637,8 +679,10 @@ export class RedisAdapter implements Adapter { const timestamp = Date.now() const score = calculateScore(priority, timestamp) + const script = jobData.unique ? PUSH_UNIQUE_JOB_SCRIPT : PUSH_JOB_SCRIPT + await this.#connection.eval( - PUSH_JOB_SCRIPT, + script, 2, keys.data, keys.pending, diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index 9198895..e95d05d 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -47,6 +47,7 @@ export class JobDispatcher { #delay?: Duration #priority?: number #groupId?: string + #id?: string /** * Create a new job dispatcher. @@ -148,6 +149,40 @@ export class JobDispatcher { return this } + /** + * Set a custom job ID for deduplication. + * + * When a custom ID is provided, the adapter will silently skip + * the job if one with the same ID already exists in the queue. + * The ID is automatically prefixed with the job name to prevent + * collisions between different job types. + * + * @param jobId - Custom identifier for this job + * @returns This dispatcher for chaining + * + * @example + * ```typescript + * // Prevent duplicate invoice jobs for the same order + * await SendInvoiceJob.dispatch({ orderId: 123 }) + * .id('order-123') + * .run() + * + * // Second dispatch with same ID is silently skipped + * await SendInvoiceJob.dispatch({ orderId: 123 }) + * .id('order-123') + * .run() + * ``` + */ + id(jobId: string): this { + if (!jobId) { + throw new Error('Job ID must be a non-empty string') + } + + this.#id = jobId + + return this + } + /** * Use a specific adapter for this job. * @@ -181,7 +216,7 @@ export class JobDispatcher { * ``` */ async run(): Promise { - const id = randomUUID() + const id = this.#id ? `${this.#name}::${this.#id}` : randomUUID() debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) @@ -197,6 +232,7 @@ export class JobDispatcher { priority: this.#priority, groupId: this.#groupId, createdAt: Date.now(), + ...(this.#id ? { unique: true } : {}), } const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } diff --git a/src/types/main.ts b/src/types/main.ts index 4bc68c9..a4e8c5f 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -133,6 +133,13 @@ export interface JobData { * Injected by OTel plugin at dispatch time. */ traceContext?: Record + + /** + * When true, adapters use atomic insert-if-not-exists semantics + * to silently skip duplicate jobs with the same ID. + * Set automatically when a custom job ID is provided via `.id()`. + */ + unique?: boolean } /** diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index 08fbc12..c0a1768 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -51,6 +51,11 @@ export class MemoryAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { + if (jobData.unique) { + const jobs = this.#queues.get(queue) + if (jobs?.some((j) => j.id === jobData.id)) return + } + if (!this.#queues.has(queue)) { this.#queues.set(queue, []) } @@ -63,6 +68,14 @@ export class MemoryAdapter implements Adapter { } pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + if (jobData.unique) { + const jobs = this.#queues.get(queue) + if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve() + + const delayed = this.#delayedJobs.get(queue) + if (delayed?.has(jobData.id)) return Promise.resolve() + } + if (!this.#delayedJobs.has(queue)) { this.#delayedJobs.set(queue, new Map()) } diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index ecf5727..f363830 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -613,7 +613,9 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.isNull(job3) }) - test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ assert }) => { + test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ + assert, + }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1647,4 +1649,112 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(second!.id, 'medium') assert.equal(third!.id, 'low') }) + + test('pushOn with unique flag should skip duplicate job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + unique: true, + }) + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + unique: true, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 1) + + const job = await adapter.popFrom('test-queue') + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('pushOn without unique flag should insert normally', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'job-1', + name: 'TestJob', + payload: { data: 'first' }, + attempts: 0, + }) + + await adapter.pushOn('test-queue', { + id: 'job-2', + name: 'TestJob', + payload: { data: 'second' }, + attempts: 0, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 2) + }) + + test('pushLaterOn with unique flag should skip duplicate delayed job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + unique: true, + }, + 60_000 + ) + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + unique: true, + }, + 60_000 + ) + + const job = await adapter.getJob('TestJob::delayed-1', 'test-queue') + assert.isNotNull(job) + assert.deepEqual(job!.data.payload, { attempt: 1 }) + }) + + test('pushOn with unique flag should allow same id on different queues', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('queue-a', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'a' }, + attempts: 0, + unique: true, + }) + + await adapter.pushOn('queue-b', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'b' }, + attempts: 0, + unique: true, + }) + + const sizeA = await adapter.sizeOf('queue-a') + const sizeB = await adapter.sizeOf('queue-b') + assert.equal(sizeA, 1) + assert.equal(sizeB, 1) + }) } diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index a4eae64..f6d1383 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -91,6 +91,64 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) + test('should skip duplicate pushOn when unique flag is set', async ({ assert }) => { + const adapter = fake()() + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + unique: true, + }) + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + unique: true, + }) + + const size = await adapter.size() + assert.equal(size, 1) + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + + test('should skip duplicate pushLaterOn when unique flag is set', async ({ assert }) => { + const adapter = fake()() + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + unique: true, + }, + 5000 + ) + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + unique: true, + }, + 5000 + ) + + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + test('should support job class matchers', async ({ assert }) => { const adapter = fake()() @@ -128,5 +186,4 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - }) diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 7c71311..3027c04 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -317,6 +317,119 @@ test.group('JobDispatcher | groupId', () => { }) }) +test.group('JobDispatcher | custom id', () => { + test('should throw error when id is empty', async ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', { data: 'test' }).id(''), + 'Job ID must be a non-empty string' + ) + }) + + test('should use custom id prefixed with job name', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const { jobId } = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) + .id('order-123') + .run() + + assert.equal(jobId, 'SendInvoiceJob::order-123') + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.equal(job!.id, 'SendInvoiceJob::order-123') + }) + + test('should set unique flag on job data when custom id is provided', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('UniqueJob', { data: 'test' }).id('my-id').run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.isTrue(job!.unique) + }) + + test('should not set unique flag when no custom id is provided', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('RegularJob', { data: 'test' }).run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.isUndefined(job!.unique) + }) + + test('should silently skip duplicate job with same custom id', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('DedupJob', { attempt: 1 }).id('dedup-1').run() + await new JobDispatcher('DedupJob', { attempt: 2 }).id('dedup-1').run() + + const size = await sharedAdapter.size() + assert.equal(size, 1) + + const job = await sharedAdapter.pop() + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('should allow same custom id for different job names', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('JobA', { type: 'a' }).id('same-id').run() + await new JobDispatcher('JobB', { type: 'b' }).id('same-id').run() + + const size = await sharedAdapter.size() + assert.equal(size, 2) + }) + + test('should work with other options like priority and queue', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const { jobId } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) + .id('task-1') + .toQueue('high') + .priority(1) + .run() + + assert.equal(jobId, 'PriorityDedupJob::task-1') + + const job = await sharedAdapter.popFrom('high') + assert.isNotNull(job) + assert.equal(job!.priority, 1) + assert.isTrue(job!.unique) + }) +}) + test.group('JobBatchDispatcher', () => { test('should dispatch multiple jobs correctly', async ({ assert }) => { const sharedAdapter = memory()() From 67d73221922520cd31d77b7c6d7a47bf425c66e3 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Fri, 10 Apr 2026 18:48:44 +0530 Subject: [PATCH 2/4] fix: remove unused assert parameter in fake_adapter test --- tests/fake_adapter.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index f6d1383..077977f 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -117,7 +117,7 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - test('should skip duplicate pushLaterOn when unique flag is set', async ({ assert }) => { + test('should skip duplicate pushLaterOn when unique flag is set', async () => { const adapter = fake()() await adapter.pushLaterOn( From 69c06352e6c7f40ae4ba6faeddefe93e6680cc22 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Fri, 10 Apr 2026 23:09:41 +0530 Subject: [PATCH 3/4] fix: check all job states in dedup guard for fake and memory adapters --- README.md | 3 ++- src/drivers/fake_adapter.ts | 13 +++++-------- tests/_mocks/memory_adapter.ts | 13 +++++-------- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 2add2d0..6c9ab86 100644 --- a/README.md +++ b/README.md @@ -152,10 +152,11 @@ await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run() ``` -Deduplication is atomic and race-condition-free across all adapters: +Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks: - **Redis**: Uses `HSETNX` (set-if-not-exists) - **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING` +- **SyncAdapter**: Executes jobs inline and does not support deduplication > [!NOTE] > Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`. diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index 39bfbb8..c3af06c 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -161,8 +161,8 @@ export class FakeAdapter implements Adapter { async pushOn(queue: string, jobData: JobData): Promise { if (jobData.unique) { - const jobs = this.#queues.get(queue) - if (jobs?.some((j) => j.id === jobData.id)) return + const existing = await this.getJob(jobData.id, queue) + if (existing) return } this.#recordPush(queue, jobData) @@ -173,13 +173,10 @@ export class FakeAdapter implements Adapter { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { if (jobData.unique) { - const jobs = this.#queues.get(queue) - if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve() - - const delayed = this.#delayedJobs.get(queue) - if (delayed?.has(jobData.id)) return Promise.resolve() + const existing = await this.getJob(jobData.id, queue) + if (existing) return } this.#recordPush(queue, jobData, delay) diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index c0a1768..de18210 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -52,8 +52,8 @@ export class MemoryAdapter implements Adapter { async pushOn(queue: string, jobData: JobData): Promise { if (jobData.unique) { - const jobs = this.#queues.get(queue) - if (jobs?.some((j) => j.id === jobData.id)) return + const existing = await this.getJob(jobData.id, queue) + if (existing) return } if (!this.#queues.has(queue)) { @@ -67,13 +67,10 @@ export class MemoryAdapter implements Adapter { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { if (jobData.unique) { - const jobs = this.#queues.get(queue) - if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve() - - const delayed = this.#delayedJobs.get(queue) - if (delayed?.has(jobData.id)) return Promise.resolve() + const existing = await this.getJob(jobData.id, queue) + if (existing) return } if (!this.#delayedJobs.has(queue)) { From b53a6df5035840c8e5cb9ed8832eb83ed0b406a1 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Sat, 11 Apr 2026 00:12:14 +0530 Subject: [PATCH 4/4] refactor: replace .id() with .dedup() API for extensible job deduplication --- .gitignore | 3 ++ README.md | 18 +++++----- src/drivers/fake_adapter.ts | 4 +-- src/drivers/knex_adapter.ts | 4 +-- src/drivers/redis_adapter.ts | 12 +++---- src/job_dispatcher.ts | 40 ++++++++++++---------- src/types/main.ts | 10 ++++-- tests/_mocks/memory_adapter.ts | 4 +-- tests/_utils/register_driver_test_suite.ts | 20 +++++------ tests/fake_adapter.spec.ts | 12 +++---- tests/job_dispatcher.spec.ts | 38 ++++++++++---------- 11 files changed, 87 insertions(+), 78 deletions(-) diff --git a/.gitignore b/.gitignore index 3df4c92..9153b8c 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,9 @@ coverage npm-debug.log yarn-error.log +# OS specific +.DS_Store + # Editors specific .fleet .idea diff --git a/README.md b/README.md index 6c9ab86..a3a0969 100644 --- a/README.md +++ b/README.md @@ -134,22 +134,22 @@ The `groupId` is stored with job data and accessible via `job.data.groupId`. ## Job Deduplication -Prevent the same job from being pushed to the queue twice using custom job IDs: +Prevent the same job from being pushed to the queue twice using `.dedup()`: ```typescript // First dispatch - job is created -await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() -// Second dispatch with same ID - silently skipped -await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() +// Second dispatch with same dedup ID - silently skipped +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() ``` -The custom ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts: +The dedup ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts: ```typescript // These are two different jobs, no conflict -await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() -await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run() +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() +await SendReceiptJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() ``` Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks: @@ -159,10 +159,10 @@ Deduplication is atomic and race-condition-free for adapters that support storag - **SyncAdapter**: Executes jobs inline and does not support deduplication > [!NOTE] -> Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`. +> Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. The `.dedup()` method is only available on single dispatch, not `dispatchMany`. > [!TIP] -> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same custom ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally. +> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same dedup ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally. ## Job History & Retention diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index c3af06c..e5f830c 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -160,7 +160,7 @@ export class FakeAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { - if (jobData.unique) { + if (jobData.dedup) { const existing = await this.getJob(jobData.id, queue) if (existing) return } @@ -174,7 +174,7 @@ export class FakeAdapter implements Adapter { } async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { - if (jobData.unique) { + if (jobData.dedup) { const existing = await this.getJob(jobData.id, queue) if (existing) return } diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index a421a7c..5e688af 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -378,7 +378,7 @@ export class KnexAdapter implements Adapter { score, }) - if (jobData.unique) { + if (jobData.dedup) { await query.onConflict(['id', 'queue']).ignore() } else { await query @@ -400,7 +400,7 @@ export class KnexAdapter implements Adapter { execute_at: executeAt, }) - if (jobData.unique) { + if (jobData.dedup) { await query.onConflict(['id', 'queue']).ignore() } else { await query diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 4e4bc4b..054ac9e 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -36,11 +36,11 @@ const PUSH_JOB_SCRIPT = ` ` /** - * Lua script for pushing a unique job. + * Lua script for pushing a dedup job. * Uses HSETNX to only store data if the job doesn't already exist. * Only adds to pending ZSET if the job was newly created. */ -const PUSH_UNIQUE_JOB_SCRIPT = ` +const PUSH_DEDUP_JOB_SCRIPT = ` local data_key = KEYS[1] local pending_key = KEYS[2] local job_id = ARGV[1] @@ -73,11 +73,11 @@ const PUSH_DELAYED_JOB_SCRIPT = ` ` /** - * Lua script for pushing a unique delayed job. + * Lua script for pushing a dedup delayed job. * Uses HSETNX to only store data if the job doesn't already exist. * Only adds to delayed ZSET if the job was newly created. */ -const PUSH_UNIQUE_DELAYED_JOB_SCRIPT = ` +const PUSH_DEDUP_DELAYED_JOB_SCRIPT = ` local data_key = KEYS[1] local delayed_key = KEYS[2] local job_id = ARGV[1] @@ -660,7 +660,7 @@ export class RedisAdapter implements Adapter { const keys = this.#getKeys(queue) const executeAt = Date.now() + delay - const script = jobData.unique ? PUSH_UNIQUE_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT + const script = jobData.dedup ? PUSH_DEDUP_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT await this.#connection.eval( script, @@ -679,7 +679,7 @@ export class RedisAdapter implements Adapter { const timestamp = Date.now() const score = calculateScore(priority, timestamp) - const script = jobData.unique ? PUSH_UNIQUE_JOB_SCRIPT : PUSH_JOB_SCRIPT + const script = jobData.dedup ? PUSH_DEDUP_JOB_SCRIPT : PUSH_JOB_SCRIPT await this.#connection.eval( script, diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index e95d05d..6a104d6 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -15,11 +15,12 @@ import { parse } from './utils.js' * * ``` * Job.dispatch(payload) - * .toQueue('emails') // optional: target queue - * .priority(1) // optional: 1-10, lower = higher priority - * .in('5m') // optional: delay before processing - * .with('redis') // optional: specific adapter - * .run() // dispatch the job + * .toQueue('emails') // optional: target queue + * .priority(1) // optional: 1-10, lower = higher priority + * .in('5m') // optional: delay before processing + * .dedup({ id: 'order-123' }) // optional: deduplication + * .with('redis') // optional: specific adapter + * .run() // dispatch the job * ``` * * @typeParam T - The payload type for this job @@ -47,7 +48,7 @@ export class JobDispatcher { #delay?: Duration #priority?: number #groupId?: string - #id?: string + #dedup?: { id: string } /** * Create a new job dispatcher. @@ -150,35 +151,36 @@ export class JobDispatcher { } /** - * Set a custom job ID for deduplication. + * Configure deduplication for this job. * - * When a custom ID is provided, the adapter will silently skip - * the job if one with the same ID already exists in the queue. + * When deduplication is configured, the adapter will silently skip + * the job if one with the same dedup ID already exists in the queue. * The ID is automatically prefixed with the job name to prevent * collisions between different job types. * - * @param jobId - Custom identifier for this job + * @param options - Deduplication options + * @param options.id - Unique deduplication key * @returns This dispatcher for chaining * * @example * ```typescript * // Prevent duplicate invoice jobs for the same order * await SendInvoiceJob.dispatch({ orderId: 123 }) - * .id('order-123') + * .dedup({ id: 'order-123' }) * .run() * - * // Second dispatch with same ID is silently skipped + * // Second dispatch with same dedup ID is silently skipped * await SendInvoiceJob.dispatch({ orderId: 123 }) - * .id('order-123') + * .dedup({ id: 'order-123' }) * .run() * ``` */ - id(jobId: string): this { - if (!jobId) { - throw new Error('Job ID must be a non-empty string') + dedup(options: { id: string }): this { + if (!options.id) { + throw new Error('Dedup ID must be a non-empty string') } - this.#id = jobId + this.#dedup = options return this } @@ -216,7 +218,7 @@ export class JobDispatcher { * ``` */ async run(): Promise { - const id = this.#id ? `${this.#name}::${this.#id}` : randomUUID() + const id = this.#dedup ? `${this.#name}::${this.#dedup.id}` : randomUUID() debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) @@ -232,7 +234,7 @@ export class JobDispatcher { priority: this.#priority, groupId: this.#groupId, createdAt: Date.now(), - ...(this.#id ? { unique: true } : {}), + ...(this.#dedup ? { dedup: { id: this.#dedup.id } } : {}), } const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } diff --git a/src/types/main.ts b/src/types/main.ts index a4e8c5f..599086b 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -135,11 +135,15 @@ export interface JobData { traceContext?: Record /** - * When true, adapters use atomic insert-if-not-exists semantics + * Deduplication configuration for this job. + * When set, adapters use atomic insert-if-not-exists semantics * to silently skip duplicate jobs with the same ID. - * Set automatically when a custom job ID is provided via `.id()`. + * Set automatically when `.dedup()` is called on the dispatcher. */ - unique?: boolean + dedup?: { + /** The original dedup key provided by the caller (before name-prefixing). */ + id: string + } } /** diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index de18210..a944610 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -51,7 +51,7 @@ export class MemoryAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { - if (jobData.unique) { + if (jobData.dedup) { const existing = await this.getJob(jobData.id, queue) if (existing) return } @@ -68,7 +68,7 @@ export class MemoryAdapter implements Adapter { } async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { - if (jobData.unique) { + if (jobData.dedup) { const existing = await this.getJob(jobData.id, queue) if (existing) return } diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index f363830..ab80635 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -1650,7 +1650,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(third!.id, 'low') }) - test('pushOn with unique flag should skip duplicate job', async ({ assert }) => { + test('pushOn with dedup should skip duplicate job', async ({ assert }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1659,7 +1659,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { attempt: 1 }, attempts: 0, - unique: true, + dedup: { id: 'order-1' }, }) await adapter.pushOn('test-queue', { @@ -1667,7 +1667,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { attempt: 2 }, attempts: 0, - unique: true, + dedup: { id: 'order-1' }, }) const size = await adapter.sizeOf('test-queue') @@ -1677,7 +1677,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.deepEqual(job!.payload, { attempt: 1 }) }) - test('pushOn without unique flag should insert normally', async ({ assert }) => { + test('pushOn without dedup should insert normally', async ({ assert }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1699,7 +1699,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(size, 2) }) - test('pushLaterOn with unique flag should skip duplicate delayed job', async ({ assert }) => { + test('pushLaterOn with dedup should skip duplicate delayed job', async ({ assert }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1710,7 +1710,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { attempt: 1 }, attempts: 0, - unique: true, + dedup: { id: 'delayed-1' }, }, 60_000 ) @@ -1722,7 +1722,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { attempt: 2 }, attempts: 0, - unique: true, + dedup: { id: 'delayed-1' }, }, 60_000 ) @@ -1732,7 +1732,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.deepEqual(job!.data.payload, { attempt: 1 }) }) - test('pushOn with unique flag should allow same id on different queues', async ({ assert }) => { + test('pushOn with dedup should allow same id on different queues', async ({ assert }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1741,7 +1741,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { queue: 'a' }, attempts: 0, - unique: true, + dedup: { id: 'shared-id' }, }) await adapter.pushOn('queue-b', { @@ -1749,7 +1749,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { queue: 'b' }, attempts: 0, - unique: true, + dedup: { id: 'shared-id' }, }) const sizeA = await adapter.sizeOf('queue-a') diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index 077977f..94924b6 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -91,7 +91,7 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - test('should skip duplicate pushOn when unique flag is set', async ({ assert }) => { + test('should skip duplicate pushOn when dedup is set', async ({ assert }) => { const adapter = fake()() await adapter.pushOn('default', { @@ -99,7 +99,7 @@ test.group('FakeAdapter', () => { name: 'TestJob', payload: { attempt: 1 }, attempts: 0, - unique: true, + dedup: { id: 'order-1' }, }) await adapter.pushOn('default', { @@ -107,7 +107,7 @@ test.group('FakeAdapter', () => { name: 'TestJob', payload: { attempt: 2 }, attempts: 0, - unique: true, + dedup: { id: 'order-1' }, }) const size = await adapter.size() @@ -117,7 +117,7 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - test('should skip duplicate pushLaterOn when unique flag is set', async () => { + test('should skip duplicate pushLaterOn when dedup is set', async () => { const adapter = fake()() await adapter.pushLaterOn( @@ -127,7 +127,7 @@ test.group('FakeAdapter', () => { name: 'TestJob', payload: { attempt: 1 }, attempts: 0, - unique: true, + dedup: { id: 'delayed-1' }, }, 5000 ) @@ -139,7 +139,7 @@ test.group('FakeAdapter', () => { name: 'TestJob', payload: { attempt: 2 }, attempts: 0, - unique: true, + dedup: { id: 'delayed-1' }, }, 5000 ) diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 3027c04..214b4e9 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -317,15 +317,15 @@ test.group('JobDispatcher | groupId', () => { }) }) -test.group('JobDispatcher | custom id', () => { - test('should throw error when id is empty', async ({ assert }) => { +test.group('JobDispatcher | dedup', () => { + test('should throw error when dedup id is empty', async ({ assert }) => { assert.throws( - () => new JobDispatcher('TestJob', { data: 'test' }).id(''), - 'Job ID must be a non-empty string' + () => new JobDispatcher('TestJob', { data: 'test' }).dedup({ id: '' }), + 'Dedup ID must be a non-empty string' ) }) - test('should use custom id prefixed with job name', async ({ assert }) => { + test('should use dedup id prefixed with job name', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -334,7 +334,7 @@ test.group('JobDispatcher | custom id', () => { }) const { jobId } = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) - .id('order-123') + .dedup({ id: 'order-123' }) .run() assert.equal(jobId, 'SendInvoiceJob::order-123') @@ -344,7 +344,7 @@ test.group('JobDispatcher | custom id', () => { assert.equal(job!.id, 'SendInvoiceJob::order-123') }) - test('should set unique flag on job data when custom id is provided', async ({ assert }) => { + test('should set dedup field on job data when dedup is configured', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -352,14 +352,14 @@ test.group('JobDispatcher | custom id', () => { adapters: { memory: () => sharedAdapter }, }) - await new JobDispatcher('UniqueJob', { data: 'test' }).id('my-id').run() + await new JobDispatcher('UniqueJob', { data: 'test' }).dedup({ id: 'my-id' }).run() const job = await sharedAdapter.pop() assert.isNotNull(job) - assert.isTrue(job!.unique) + assert.deepEqual(job!.dedup, { id: 'my-id' }) }) - test('should not set unique flag when no custom id is provided', async ({ assert }) => { + test('should not set dedup field when dedup is not configured', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -371,10 +371,10 @@ test.group('JobDispatcher | custom id', () => { const job = await sharedAdapter.pop() assert.isNotNull(job) - assert.isUndefined(job!.unique) + assert.isUndefined(job!.dedup) }) - test('should silently skip duplicate job with same custom id', async ({ assert }) => { + test('should silently skip duplicate job with same dedup id', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -382,8 +382,8 @@ test.group('JobDispatcher | custom id', () => { adapters: { memory: () => sharedAdapter }, }) - await new JobDispatcher('DedupJob', { attempt: 1 }).id('dedup-1').run() - await new JobDispatcher('DedupJob', { attempt: 2 }).id('dedup-1').run() + await new JobDispatcher('DedupJob', { attempt: 1 }).dedup({ id: 'dedup-1' }).run() + await new JobDispatcher('DedupJob', { attempt: 2 }).dedup({ id: 'dedup-1' }).run() const size = await sharedAdapter.size() assert.equal(size, 1) @@ -392,7 +392,7 @@ test.group('JobDispatcher | custom id', () => { assert.deepEqual(job!.payload, { attempt: 1 }) }) - test('should allow same custom id for different job names', async ({ assert }) => { + test('should allow same dedup id for different job names', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -400,8 +400,8 @@ test.group('JobDispatcher | custom id', () => { adapters: { memory: () => sharedAdapter }, }) - await new JobDispatcher('JobA', { type: 'a' }).id('same-id').run() - await new JobDispatcher('JobB', { type: 'b' }).id('same-id').run() + await new JobDispatcher('JobA', { type: 'a' }).dedup({ id: 'same-id' }).run() + await new JobDispatcher('JobB', { type: 'b' }).dedup({ id: 'same-id' }).run() const size = await sharedAdapter.size() assert.equal(size, 2) @@ -416,7 +416,7 @@ test.group('JobDispatcher | custom id', () => { }) const { jobId } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) - .id('task-1') + .dedup({ id: 'task-1' }) .toQueue('high') .priority(1) .run() @@ -426,7 +426,7 @@ test.group('JobDispatcher | custom id', () => { const job = await sharedAdapter.popFrom('high') assert.isNotNull(job) assert.equal(job!.priority, 1) - assert.isTrue(job!.unique) + assert.deepEqual(job!.dedup, { id: 'task-1' }) }) })