Job Queue
createJobQueue(pgClient, options?) is a Postgres-backed job queue. Producers enqueue(), consumers claim() -> work -> complete() or fail(). Uses SELECT ... FOR UPDATE SKIP LOCKED for lock-free claim across workers.
For most apps, createTaskRunner is the right interface - it builds on this queue and adds idempotency, retries, and worker-thread execution. Reach for createJobQueue directly when you need a bare queue without those features.
Setup
import { createPgClient } from 'svelte-adapter-uws-extensions/postgres';
import { createJobQueue } from 'svelte-adapter-uws-extensions/postgres/jobs';
const pg = createPgClient({ connectionString: process.env.DATABASE_URL });
const jobs = await createJobQueue(pg); Auto-creates svti_jobs table on first use.
Producer
const jobId = await jobs.enqueue({
type: 'send-email',
payload: { to: 'alice@example.com', template: 'welcome' }
}, { runAt: new Date(Date.now() + 60_000), priority: 5 }); Consumer
async function workerLoop() {
while (running) {
const job = await jobs.claim({ types: ['send-email'], leaseMs: 30_000 });
if (!job) {
await sleep(1000);
continue;
}
try {
await sendEmail(job.payload);
await jobs.complete(job.id);
} catch (err) {
await jobs.fail(job.id, { reason: err.message, retry: true });
}
}
} API
| Method | Description |
|---|---|
enqueue(job, opts?) | Insert a job. runAt, priority, idempotencyKey optional. |
claim(opts?) | Atomically claim the next eligible job for leaseMs ms. Returns null if none available. |
complete(id) | Mark a claimed job complete and remove it. |
fail(id, { reason, retry?, runAt? }) | Mark a claimed job failed. With retry: true, re-enqueues. |
extend(id, ms) | Extend the lease past expiration. |
pending() | Count of un-claimed jobs. |
clear({ olderThanMs? }) | Bulk delete completed / failed jobs. |
Options
| Option | Default | Description |
|---|---|---|
table | 'svti_jobs' | Postgres table name. |
defaultLeaseMs | 30_000 | Default claim lease. |
The job row carries request_id so request-correlation IDs flow from tasks.enqueue / jobs.enqueue callers through retry and recovery and become available as job.requestId (or ctx.requestId in the task runner). Idempotent forward migration via ALTER TABLE.
See also
createTaskRunner- higher-level idempotent runner built on this queue.
Was this page helpful?