Task Runner

createTaskRunner(pgClient, options?) is a durable, idempotent task runner backed by Postgres. Three guarantees:

  • Caller-retry idempotency. Two tasks.run() calls with the same idempotencyKey resolve to the same result without running the handler twice.
  • Worker-crash recovery. A worker that crashes mid-task is recovered by another worker via Postgres FOR UPDATE SKIP LOCKED (and an optional Redis fence for hard-isolation).
  • External-service idempotency. A handler that hits an external service (Stripe, Twilio, S3) can pass an idempotency key through to the external API; same key on retry hits the external service’s own dedup.

Setup

import { createPgClient } from 'svelte-adapter-uws-extensions/postgres';
import { createTaskRunner } from 'svelte-adapter-uws-extensions/postgres/tasks';

const pg = createPgClient({ connectionString: process.env.DATABASE_URL });
const tasks = await createTaskRunner(pg);

The runner auto-creates a svti_tasks table on first use. Override the table name via { table: 'my_tasks' }.

Register a task

tasks.register('charge-customer', {
  async handler(ctx, { customerId, amountCents }) {
    const receipt = await stripe.charges.create({
      amount: amountCents,
      currency: 'usd',
      customer: customerId,
      idempotency_key: ctx.taskId  // external-service idempotency
    });
    return { receiptId: receipt.id };
  },
  retry: { maxAttempts: 3, backoff: 'exponential', on: ['network', 'timeout'] }
});

Run a task

const result = await tasks.run('charge-customer', {
  customerId: 'cus_123',
  amountCents: 2500
}, {
  idempotencyKey: requestId
});

tasks.run() blocks until the task completes or fails. For fire-and-forget enqueue + later await:

const taskId = await tasks.enqueue('charge-customer', payload, { idempotencyKey: requestId });
// ... later, possibly on a different worker:
const result = await tasks.await(taskId);

Idempotency key namespacing

The cache key passed to the underlying idempotency store is 'task:' + name + ':' + idempotencyKey. Two tasks sharing one client-supplied key cannot share a cache slot. Both run() and enqueue() reject idempotencyKey longer than 256 chars; the stores cap acquire(key) at 1024 chars as defense in depth.

Upgrading from 0.4.x: In-flight cached entries from before the upgrade become invisible after deploy because the namespaced key does not match the old un-namespaced key. See Migration 0.4 to 0.5.

Worker-thread execution

Pass worker: true (or a path to a worker module) on register() to run the handler in a dedicated worker thread. Per-task thread pool, default size 1, idle timeout 30 s.

tasks.register('expensive-render', {
  worker: './src/lib/server/render-worker.js',
  async handler(ctx, payload) { /* runs in a worker thread */ }
});

Force takeover

tasks.takeover(taskId) forcibly releases a task’s Postgres row lock and Redis fence, allowing another worker to pick it up. Useful for migrations and recovery from a stuck worker that survived the lock-timeout window.

const counts = await tasks.counts(); // { pending, running, committed, failed, total }
const stuck = await tasks.list({ status: 'running', staleAfterMs: 5 * 60_000 });
for (const task of stuck) await tasks.takeover(task.id);

API

MethodDescription
register(name, opts)Define a task handler.
run(name, payload, opts?)Run synchronously; resolves with the handler’s return value.
enqueue(name, payload, opts?)Returns a taskId immediately; handler runs at the next dispatch tick.
await(taskId)Wait for an enqueued task.
list(filter?)Observability list (camelCase, JSON-parsed payload, max limit 1000).
counts(filter?)Bucketed counts (pending, running, committed, failed, total).
takeover(taskId)Force-release locks; allow another worker to claim the task.
ready()Resolves once the runner has migrated svti_tasks and is ready to dispatch.

Options

OptionDefaultDescription
table'svti_tasks'Postgres table name.
dispatchInterval5_000 msPoll interval for the async (enqueue) path.
fenceunsetOptional Redis fence provider (createFence(redis)) for worker-isolation hard-guarantees.
onStateChange(taskId, state)unsetLocal-worker observer: null -> pending, null -> running, pending -> running, running -> running (retry), running -> committed, running -> failed. Errors caught and logged.

State transitions

null -> pending  (enqueue)
null -> running  (run, claim)
pending -> running
running -> running  (retry / recovery)
running -> committed  (handler resolved)
running -> failed     (handler threw past maxAttempts)

onStateChange fires for every transition on the local worker.

See also

Was this page helpful?