Task Runner
createTaskRunner(pgClient, options?) is a durable, idempotent task runner backed by Postgres. Three guarantees:
- Caller-retry idempotency. Two
tasks.run()calls with the sameidempotencyKeyresolve to the same result without running the handler twice. - Worker-crash recovery. A worker that crashes mid-task is recovered by another worker via Postgres
FOR UPDATE SKIP LOCKED(and an optional Redis fence for hard-isolation). - External-service idempotency. A handler that hits an external service (Stripe, Twilio, S3) can pass an idempotency key through to the external API; same key on retry hits the external service’s own dedup.
Setup
import { createPgClient } from 'svelte-adapter-uws-extensions/postgres';
import { createTaskRunner } from 'svelte-adapter-uws-extensions/postgres/tasks';
const pg = createPgClient({ connectionString: process.env.DATABASE_URL });
const tasks = await createTaskRunner(pg); The runner auto-creates a svti_tasks table on first use. Override the table name via { table: 'my_tasks' }.
Register a task
tasks.register('charge-customer', {
async handler(ctx, { customerId, amountCents }) {
const receipt = await stripe.charges.create({
amount: amountCents,
currency: 'usd',
customer: customerId,
idempotency_key: ctx.taskId // external-service idempotency
});
return { receiptId: receipt.id };
},
retry: { maxAttempts: 3, backoff: 'exponential', on: ['network', 'timeout'] }
}); Run a task
const result = await tasks.run('charge-customer', {
customerId: 'cus_123',
amountCents: 2500
}, {
idempotencyKey: requestId
}); tasks.run() blocks until the task completes or fails. For fire-and-forget enqueue + later await:
const taskId = await tasks.enqueue('charge-customer', payload, { idempotencyKey: requestId });
// ... later, possibly on a different worker:
const result = await tasks.await(taskId); Idempotency key namespacing
The cache key passed to the underlying idempotency store is 'task:' + name + ':' + idempotencyKey. Two tasks sharing one client-supplied key cannot share a cache slot. Both run() and enqueue() reject idempotencyKey longer than 256 chars; the stores cap acquire(key) at 1024 chars as defense in depth.
Upgrading from 0.4.x: In-flight cached entries from before the upgrade become invisible after deploy because the namespaced key does not match the old un-namespaced key. See Migration 0.4 to 0.5.
Worker-thread execution
Pass worker: true (or a path to a worker module) on register() to run the handler in a dedicated worker thread. Per-task thread pool, default size 1, idle timeout 30 s.
tasks.register('expensive-render', {
worker: './src/lib/server/render-worker.js',
async handler(ctx, payload) { /* runs in a worker thread */ }
}); Force takeover
tasks.takeover(taskId) forcibly releases a task’s Postgres row lock and Redis fence, allowing another worker to pick it up. Useful for migrations and recovery from a stuck worker that survived the lock-timeout window.
const counts = await tasks.counts(); // { pending, running, committed, failed, total }
const stuck = await tasks.list({ status: 'running', staleAfterMs: 5 * 60_000 });
for (const task of stuck) await tasks.takeover(task.id); API
| Method | Description |
|---|---|
register(name, opts) | Define a task handler. |
run(name, payload, opts?) | Run synchronously; resolves with the handler’s return value. |
enqueue(name, payload, opts?) | Returns a taskId immediately; handler runs at the next dispatch tick. |
await(taskId) | Wait for an enqueued task. |
list(filter?) | Observability list (camelCase, JSON-parsed payload, max limit 1000). |
counts(filter?) | Bucketed counts (pending, running, committed, failed, total). |
takeover(taskId) | Force-release locks; allow another worker to claim the task. |
ready() | Resolves once the runner has migrated svti_tasks and is ready to dispatch. |
Options
| Option | Default | Description |
|---|---|---|
table | 'svti_tasks' | Postgres table name. |
dispatchInterval | 5_000 ms | Poll interval for the async (enqueue) path. |
fence | unset | Optional Redis fence provider (createFence(redis)) for worker-isolation hard-guarantees. |
onStateChange(taskId, state) | unset | Local-worker observer: null -> pending, null -> running, pending -> running, running -> running (retry), running -> committed, running -> failed. Errors caught and logged. |
State transitions
null -> pending (enqueue)
null -> running (run, claim)
pending -> running
running -> running (retry / recovery)
running -> committed (handler resolved)
running -> failed (handler threw past maxAttempts) onStateChange fires for every transition on the local worker.
See also
createJobQueue- the lower-level Postgres job queue this runner builds on.createIdempotencyStore- the store backingidempotencyKey.createFence- Redis fence provider for hard-isolation.
Was this page helpful?