Job Queue

createJobQueue(pgClient, options?) is a Postgres-backed job queue. Producers enqueue(), consumers claim() -> work -> complete() or fail(). Uses SELECT ... FOR UPDATE SKIP LOCKED for lock-free claim across workers.

For most apps, createTaskRunner is the right interface - it builds on this queue and adds idempotency, retries, and worker-thread execution. Reach for createJobQueue directly when you need a bare queue without those features.

Setup

import { createPgClient } from 'svelte-adapter-uws-extensions/postgres';
import { createJobQueue } from 'svelte-adapter-uws-extensions/postgres/jobs';

const pg = createPgClient({ connectionString: process.env.DATABASE_URL });
const jobs = await createJobQueue(pg);

Auto-creates svti_jobs table on first use.

Producer

const jobId = await jobs.enqueue({
  type: 'send-email',
  payload: { to: 'alice@example.com', template: 'welcome' }
}, { runAt: new Date(Date.now() + 60_000), priority: 5 });

Consumer

async function workerLoop() {
  while (running) {
    const job = await jobs.claim({ types: ['send-email'], leaseMs: 30_000 });
    if (!job) {
      await sleep(1000);
      continue;
    }
    try {
      await sendEmail(job.payload);
      await jobs.complete(job.id);
    } catch (err) {
      await jobs.fail(job.id, { reason: err.message, retry: true });
    }
  }
}

API

MethodDescription
enqueue(job, opts?)Insert a job. runAt, priority, idempotencyKey optional.
claim(opts?)Atomically claim the next eligible job for leaseMs ms. Returns null if none available.
complete(id)Mark a claimed job complete and remove it.
fail(id, { reason, retry?, runAt? })Mark a claimed job failed. With retry: true, re-enqueues.
extend(id, ms)Extend the lease past expiration.
pending()Count of un-claimed jobs.
clear({ olderThanMs? })Bulk delete completed / failed jobs.

Options

OptionDefaultDescription
table'svti_jobs'Postgres table name.
defaultLeaseMs30_000Default claim lease.

The job row carries request_id so request-correlation IDs flow from tasks.enqueue / jobs.enqueue callers through retry and recovery and become available as job.requestId (or ctx.requestId in the task runner). Idempotent forward migration via ALTER TABLE.

See also

Was this page helpful?