API Reference

Server - svelte-realtime/server

live(fn)

Wrap a function as a WebSocket RPC endpoint.

import { live } from 'svelte-realtime/server';

export const myFunction = live(async (ctx, arg1, arg2) => {
  return result;
});
ParameterTypeDescription
fn(ctx, ...args) => anyServer function. First arg is always ctx.

Returns: Callable RPC. On the client, becomes an async function.


live.stream(topic, init, options?)

Create a reactive stream subscription.

export const items = live.stream('items', async (ctx) => {
  return db.items.all();
}, { merge: 'crud', key: 'id' });
ParameterTypeDescription
topicstring \| (ctx, ...args) => stringTopic name, or factory for dynamic topics
init(ctx, ...args) => anyReturns initial data for new subscribers
optionsStreamOptionsSee below

StreamOptions:

OptionTypeDefaultDescription
merge'crud' \| 'set' \| 'latest' \| 'presence' \| 'cursor''crud'Merge strategy
keystring'id'Key field for crud merge
prependbooleanfalsePrepend new items (crud)
maxnumber50Max items (latest)
replaybooleanfalseSeq-based replay
argsStandardSchema-Schema applied to dynamic-topic factory args before loader runs
transform(ctx, item) => item-Projection applied to initial-load AND every live publish
coalesceBy(event, data) => key-Per-subscriber same-tick coalesce. Mutually exclusive with volatile
volatilebooleanfalseSkip per-topic seq stamping. Mutually exclusive with coalesceBy
staleAfterMsnumber-Re-run init loader if no publish lands within this window
invalidateOnstring \| string[]-Trigger topic(s) for ctx.invalidate(topic) re-init
onError(err, ctx) => void-Per-stream error hook. Defaults to the global onError registry
classOfServicestring-Named load-shedding class registered via live.admission()
onSubscribe(ctx, topic) => void-Fires when a client subscribes
onUnsubscribe(ctx, topic, remainingSubscribers) => void-Fires when a client unsubscribes. Third arg is post-leave count - use === 0 to tear down upstream feeds
access(ctx) => boolean-Subscription access control
filter(ctx, event, data) => boolean-Per-event publish filter
deltaDeltaConfig-Delta sync config
versionnumber-Schema version
migrateRecord<number, fn>-Migration functions

Returns: On the client, a Svelte store (or store factory for dynamic topics).


live.validated(schema, fn)

Validate the first argument against a Zod or Valibot schema.

import { z } from 'zod';

const Input = z.object({ text: z.string().min(1) });
export const create = live.validated(Input, async (ctx, input) => { ... });
ParameterTypeDescription
schemaZodSchema \| ValibotSchemaValidation schema
fn(ctx, validatedInput, ...args) => anyHandler receives validated data

Errors: Throws RpcError with code: 'VALIDATION' and issues array.


live.cron(schedule, fn) / live.cron(schedule, topic, fn)

Scheduled server-side task.

export const cleanup = live.cron('*/5 * * * *', async (ctx) => { ... });
export const refresh = live.cron('*/30 * * * * *', 'stats', async () => stats);
ParameterTypeDescription
schedulestringCron expression: minute hour day month weekday
topicstring(Optional) Auto-publish return value as set event
fn(ctx) => anyTask function. ctx has publish, publishThrottled, publishDebounced, signal but no user / ws / skip (since cron has no per-request key).

live.binary(fn, options?)

Handle ArrayBuffer payloads.

export const upload = live.binary(async (ctx, buffer) => { ... }, { maxSize: 5 * 1024 * 1024 });

Binary frames bypass JSON serialization. Default maxSize: 10 * 1024 * 1024 (10 MB).


live.upload(fn, options?)

Streaming uploads with backpressure-aware chunking. See Uploads for the full surface.

export const avatar = live.upload(async (ctx, name, mime) => {
  for await (const chunk of ctx.stream) { /* ... */ }
}, { maxSize: 25 * 1024 * 1024, reauthEvery: 16 * 1024 * 1024 });
OptionDefaultDescription
maxSize100 * 1024 * 1024Per-upload byte cap.
maxConcurrentPerSession4In-flight uploads per WebSocket.
maxConcurrentTotalInfinityProcess-wide cap on in-flight uploads.
maxBufferedChunks64High-water mark for the per-upload chunk queue.
reauthEveryunsetRe-run module guard every N bytes.

ctx.stream is AsyncIterable<Uint8Array>. ctx.signal is an AbortSignal that fires on cancel, disconnect, or mid-stream re-auth failure.


live.idempotent(fn, options)

Wrap an RPC so retries with the same idempotencyKey resolve to the cached result. The cache key is namespaced as 'rpc:' + path + ':' + userKey. See RPC -> Idempotent RPCs for the full surface.

OptionDescription
keyFrom(ctx, ...args)Derive the cache key.
storeIdempotency store (in-memory, Redis, Postgres).
ttlCache TTL in ms.
acquireTtlHow long another caller waits for an in-flight key.

idempotencyKey longer than 256 chars throws LiveError('INVALID_REQUEST'). Custom keyFrom callbacks must encode tenant scope explicitly.


live.lock(key, fn, options?)

Per-key serialization. Default in-process implementation; a Redis-backed extension is available via createDistributedLock.

const result = await live.lock(`order:${orderId}`, async () => {
  return processOrder(orderId);
}, { maxWaitMs: 5000 });
OptionDescription
maxWaitMsBounded wait. Rejects with LiveError('LOCK_TIMEOUT', ...). Carries .code, .key, .maxWaitMs.

The current holder is not interrupted on timeout; subsequent waiters are unaffected. lock.clear() rejects pending waiters with LOCK_CLEARED rather than leaving them hanging.


live.notify(target, event, data)

Fire-and-forget server-initiated delivery, counterpart to live.push. Returns Promise<void>. Never rejects in normal operation; validation throws synchronously for programming errors.


live.volatile(fn)

Mark a handler as fire-and-forget. Added in svelte-realtime 0.5.8. The matching client call .fireAndForget(...args) sends a frame with no id and returns void synchronously - no Promise allocation, no dedup-Map entry, no pending-Map entry, no timer, no devtools-pending entry.

// src/lib/realtime/cursors.js
export const moveCursor = live.volatile(async (ctx, boardId, pos) => {
  ctx.publish(`board:${boardId}`, 'cursor', pos);
});
// client
moveCursor.fireAndForget(boardId, { x, y });   // returns void synchronously

Server still runs the full handler chain (middleware, guards, rate limits, validation) and metrics still fire. The only difference is _respond() is never called.

Wraps cleanly inside live.rateLimit / live.idempotent / live.breaker / live.validated / live.lock - the framework walks __wrappedFn to find the marker. Calling .fireAndForget() against a plain live() handler also works but emits a one-shot dev warn per path. See RPC -> Volatile RPC.


live.aggregate(sourceTopic, reducers, options)

Reactive aggregations with optional time-windowing. See Aggregates for the full surface.


live.channel(topic, options?)

Ephemeral pub/sub - no database init function.

export const typing = live.channel('typing:lobby', { merge: 'presence' });
export const cursors = live.channel((ctx, docId) => 'cursors:' + docId, { merge: 'cursor' });

live.derived(topics, fn, options?)

Server-side computed stream that recomputes when source topics publish.

export const stats = live.derived(['orders', 'inventory'], async () => {
  return { total: await db.orders.count() };
}, { debounce: 500 });

live.webhook(topic, config)

Bridge external HTTP webhooks (Stripe, GitHub, Slack, Twilio, etc.) into a pub/sub topic. Returns a handler with a .handle({ body, headers, platform }) method to call from a SvelteKit +server.js endpoint.

// src/live/integrations.js
import { live } from 'svelte-realtime/server';

export const stripeEvents = live.webhook('payments', {
  verify({ body, headers }) {
    return stripe.webhooks.constructEvent(body, headers['stripe-signature'], webhookSecret);
  },
  transform(event) {
    if (event.type === 'payment_intent.succeeded') {
      return { event: 'created', data: event.data.object };
    }
    return null; // ignore other event types
  }
});
// src/routes/api/stripe/+server.js
import { stripeEvents } from '$live/integrations';

export async function POST({ request, platform }) {
  const body = await request.text();
  const headers = Object.fromEntries(request.headers);
  const result = await stripeEvents.handle({ body, headers, platform });
  return new Response(result.body, { status: result.status });
}
FieldSignatureDescription
verify({ body, headers })=> any \| Promise<any>Verify the request signature and return the parsed event. Throw to reject (handler returns 400). Async-allowed.
transform(event)=> { event, data } \| null \| Promise<...>Map the verified event to a publish shape. Return null to ignore. Async-allowed - useful if the transform needs to RPUSH into a Redis list or hit a database before publishing.

verify and transform are both awaited, so an async signature check or a transform that writes durable state before broadcasting is safe; sync handlers keep working unchanged. Returning null from transform drops the event silently (handler returns 200); returning a { event, data } shape calls platform.publish(topic, event, data) and returns 200.


live.rateLimit(config, fn)

Per-function rate limiting.

export const send = live.rateLimit({ points: 5, window: 10000 }, async (ctx, text) => { ... });
OptionTypeDescription
pointsnumberMax calls per window
windownumberWindow size in ms
store'redis' \| objectOptional cross-replica rate-limit store. Pass the extensions createRateLimit(...) instance or 'redis' if a process-wide one is configured.

live.rateLimits(map)

Registry-level rate-limit map keyed by RPC path. Same shape as live.rateLimit, applied to many handlers in one declaration:

live.rateLimits({
  'messages/send':   { points: 5,  window: 10_000 },
  'boards/create':   { points: 2,  window: 60_000 },
  'cursors/move':    { points: 60, window: 1_000 }
});

Applies before per-function live.rateLimit wrappers.


live.gate(predicate, fn)

Per-call gate. The predicate runs before the handler body; returning falsy throws LiveError('FORBIDDEN'). Async-safe (Promise<false> correctly denies as of 0.5.0).

export const updateBoard = live.gate(
  async (ctx, boardId) => await isOwner(ctx.user.id, boardId),
  async (ctx, boardId, patch) => db.boards.update(boardId, patch)
);

For composing async sub-predicates, use live.access.any(...) / live.access.all(...).


live.access.* helpers

Subscribe-time access predicates. Sync leaf helpers; async composition is supported.

HelperDescription
live.access.user(field?)Allow only the authenticated user’s own data.
live.access.owner(loader)Allow when the loader returns a record matching ctx.user.id.
live.access.role(role)Allow when ctx.user.role === role.
live.access.org(field?)Allow within the user’s organization scope.
live.access.team(field?)Allow within the user’s team scope.
live.access.any(...preds)Composition: pass if any sub-predicate passes. Async-safe.
live.access.all(...preds)Composition: pass if all sub-predicates pass. Async-safe.

Pass to live.stream({ access: ... }) or live.room({ guard: ... }).


live.public(fn)

Intent marker. Skips the per-module guard(...) for this one handler. Use sparingly - public RPCs bypass module-wide auth on purpose:

export const ping = live.public(async () => ({ ok: true, t: Date.now() }));

The marker is also picked up by lint tools and the Vite plugin’s .d.ts generator so accidentally-public handlers stand out in code review.


live.publishRateWarning(config | false)

Dev-mode publish-rate sampler control. Threshold tracks per-topic publish rates and warns once when a topic exceeds the threshold. Disable globally with live.publishRateWarning(false).

live.publishRateWarning({ threshold: 1000, intervalMs: 5000 });
live.publishRateWarning(false);  // disable
OptionDefaultDescription
threshold1000Messages per second above which the warning fires.
intervalMs5000Sampler tick interval.

Dev-only; no production behaviour.


handleRpc(ws, data, platform, options?)

Low-level RPC handler. Returns true if the message was an RPC call, false otherwise. Use it directly when createMessage is too high-level:

import { handleRpc } from 'svelte-realtime/server';

export function message(ws, { data, platform }) {
  if (handleRpc(ws, data, platform)) return;
  // your custom non-RPC handling
}
OptionDefaultDescription
maxEnvelopeDepth64Max JSON nesting depth for inbound envelopes.

live.admission(config) / classOfService / ctx.shed

Realtime-layer load shedding. Register named pressure rules once at startup; tag streams with classOfService: '<name>' to shed new subscribes under matching pressure; call ctx.shed('<name>') per RPC for per-call decisions. Existing subscribers are unaffected - shedding applies to NEW subscribes and RPC calls only.

import { live, LiveError } from 'svelte-realtime/server';

// Register classes once at startup
live.admission({
  classes: {
    background: ['PUBLISH_RATE', 'SUBSCRIBERS', 'MEMORY'],  // shed under any pressure
    nonCritical: ['MEMORY'],                                 // shed only on memory pressure
    realtime: (snapshot) => snapshot.publishRate > 8000     // custom predicate
  }
});

// Stream: new subscribes shed under matching pressure
export const browseList = live.stream('browse:list', loader, {
  merge: 'crud',
  classOfService: 'background'
});

// RPC: per-call decision
export const expensiveSearch = live(async (ctx, query) => {
  if (ctx.shed('background')) {
    throw new LiveError('OVERLOADED', 'Server is busy, try again shortly');
  }
  return search(query);
});

platform.pressure.reason is precedence-ordered (MEMORY > PUBLISH_RATE > SUBSCRIBERS > NONE); string-array rules match when the active reason is in the array. Predicate rules receive the full platform.pressure snapshot. live.admission() validates rules at registration; unknown reasons throw with a [svelte-realtime]-prefixed error so typos fail fast at boot. Pass null to clear.

Same idea as the cluster-side createAdmissionControl but wired closer to the realtime layer - reach for the realtime variant when the pressure signal you care about is platform.pressure, and for the extension when you need cluster-aware shedding (top-publisher views, sharded subscriber counts).


live.middleware(fn)

Global middleware. Runs before per-module guards on every call.

live.middleware(async (ctx, next) => {
  if (!ctx.user) throw new LiveError('UNAUTHORIZED');
  return next();
});

guard(...fns)

Per-module auth guard. Export as _guard.

import { guard, LiveError } from 'svelte-realtime/server';

export const _guard = guard(
  (ctx) => { if (!ctx.user) throw new LiveError('UNAUTHORIZED'); }
);

LiveError(code, message?)

Structured error class.

throw new LiveError('NOT_FOUND', 'Item does not exist');
ParameterTypeDescription
codestringMachine-readable error code
messagestringHuman-readable message

realtime(config?)

One-call setup that returns the standard adapter hook set (open, close, message, init, optional upgrade) and wires setBus, configureCron({ leader }), setCronPlatform, and _activateDerived for you. Added in svelte-realtime 0.5.6.

// src/hooks.ws.js
import { realtime } from 'svelte-realtime/server';

export const { open, close, message, init } = realtime({ bus, leader: leader.isLeader });
export function upgrade({ cookies }) { return validateSession(cookies.session_id) || false; }
OptionTypeDescription
bus{ wrap, activate? }Cluster pub/sub bus. When set, every framework publish surface (RPC ctx.publish, cron tick, reactive seam, top-level publish()) routes through bus.wrap(...).
leader() => booleanSync predicate gating cron firing. Returning false skips this worker’s tick (cluster leader-election).
onError(scope, err, ctx?) => voidGlobal error handler for cron, effects, derived, aggregate, webhook.

Single-replica is realtime() with no config. Layer-1 primitives below are still first-class - the factory is sugar over them for the typical case. See Distributed Pub/Sub for the full clustering reference.


setBus(bus) / getBus() / getPlatform() / publish(...)

Process-wide cluster bus accessors and a top-level publish helper. Added in svelte-realtime 0.5.6.

import { setBus, getBus, getPlatform, publish } from 'svelte-realtime/server';

setBus(myBus);                          // configure once at startup
const currentBus = getBus();            // or null
const composedPlatform = getPlatform(); // null until init({ platform }) fires

// Publish from any context (HTTP endpoint, cron, etc.):
publish('topic', 'event', data);

setBus(bus) and configureCron({ bus }) write the same backing state. publish(topic, event, data, options?) routes through the composed platform - use it from a +server.js HTTP endpoint or any non-WS context to get the same cluster semantics as a publish inside an RPC handler. Pass setBus(null) to clear the bus.


createMessage(options?)

Create the WebSocket message handler for hooks.ws.js.

export const message = createMessage({
  async beforeExecute(ws, rpcPath) { ... },
  onJsonMessage(ws, msg, platform) { ... },
  onUnhandled(ws, data, platform) { ... },
  maxEnvelopeDepth: 64,
  maxJsonDepth: 64
});
OptionTypeDefaultDescription
platform(platform) => platformidentityTransform the platform object. Discouraged since 0.5.7 - use setBus(bus) / realtime({ bus }) instead; this option triggers a one-shot dev warn when a process-wide bus is configured. Auto bus-wrap kicks in when this option is unset.
beforeExecute(ws, rpcPath, args) => voidunsetRuns before every RPC. Throw to reject.
onJsonMessage(ws, msg, platform) => voidunsetReceives non-RPC text frames that parsed as a JSON object. Two-tier lookup: fast path uses the adapter’s pre-parsed ctx.msg field; fallback parses locally. Added in 0.5.9.
onUnhandled(ws, data, platform) => voidunsetReceives raw bytes for binary frames, non-JSON text, parse failures, or frames past maxJsonDepth.
maxEnvelopeDepthnumber64Max JSON nesting depth for inbound RPC envelopes. Past the cap, the envelope is silently dropped. Pass Infinity to disable.
maxJsonDepthnumber64Max nesting depth for onJsonMessage envelopes (added 0.5.9).

The maxEnvelopeDepth cap is a DoS guard. The adapter’s maxPayloadLength (default 1 MB) bounds the byte size of an envelope, but a few MB worth of {"w":{"w":{"w":...}}} nesting fits comfortably under the byte cap and was previously walked by downstream RPC handlers and any host-app instrumentation that recursively touches the parsed object - both potential stack-overflow paths. The cap walks the parsed object with an iterative stack-based traversal so the depth check itself is stack-safe (a 100k-deep envelope cannot blow up the depth-checker).

Default 64 is well past any realistic application shape. Apps that legitimately receive deeply nested envelopes (rare; usually a sign of an unbounded recursive payload) can raise the cap explicitly.


The ctx object

PropertyTypeDescription
ctx.userobjectData returned by upgrade()
ctx.wsWebSocketRaw WebSocket connection
ctx.platformobjectAdapter platform API
ctx.cursorstring \| nullCursor from loadMore()
MethodSignatureDescription
ctx.publish(topic, event, data) => voidBroadcast to all subscribers
ctx.publishThrottled(topic, event, data, ms) => voidRate-limited publish (renamed from ctx.throttle in 0.5.9; old name kept as soft-deprecated alias)
ctx.publishDebounced(topic, event, data, ms) => voidDebounced publish (renamed from ctx.debounce in 0.5.9; old name kept as soft-deprecated alias)
ctx.skip(key, ms) => booleanPer-key handler gate (LiveContext only). Returns true to skip the call. Added in 0.5.9.
ctx.signal(userId, event, data) => voidPoint-to-point message
ctx.batch(messages[]) => voidBulk publish

pipe(stream, ...transforms)

Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.

import { live, pipe } from 'svelte-realtime/server';

export const myNotifications = pipe(
  live.stream('notifications', async (ctx) => {
    return db.notifications.forUser(ctx.user.id);
  }, { merge: 'crud', key: 'id' }),

  pipe.filter((ctx, item) => !item.dismissed),
  pipe.sort('createdAt', 'desc'),
  pipe.limit(20),
  pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
);
TransformInitial dataLive events
pipe.filter(predicate)Filters the arrayInitial data only
pipe.sort(field, dir)Sorts the arrayInitial data only
pipe.limit(n)Slices to N itemsInitial data only
pipe.join(field, resolver, as)Enriches each itemInitial data only

Piped functions preserve all stream metadata. The client receives already-transformed data.


close

Ready-made close hook for hooks.ws.js. Fires onUnsubscribe for all remaining topics when a connection closes.

export { close } from 'svelte-realtime/server';

unsubscribe

Ready-made unsubscribe hook for hooks.ws.js. Fires onUnsubscribe in real time as each topic is released.

export { unsubscribe } from 'svelte-realtime/server';

setCronPlatform(platform)

Capture the adapter platform so cron jobs can publish. Recommended call site is the init lifecycle hook in hooks.ws.js (Lifecycle Hooks).

import { setCronPlatform } from 'svelte-realtime/server';

export function init({ platform }) {
  setCronPlatform(platform);
}

configureCron(config)

Cluster-mode cron primitives. See Cron -> Cluster mode.

FieldDescription
leader: () => booleanSync predicate. Worker skips the tick if false.
busPubsub bus consumed structurally as { wrap(platform): wrapped }.

At least one of leader or bus must be present. configureCron(null) clears any previously installed config.


live.configurePush({ identify?, remoteRegistry? })

Configure cluster-aware live.push / live.notify routing. At least one of identify or remoteRegistry must be provided. Recommended call site is the init hook.


realtimeTransport()

SvelteKit transport hook from svelte-realtime/hooks. Auto-registers serialization for RpcError and LiveError across the SSR / client boundary. Wire from src/hooks.js (NOT hooks.server.js).

// src/hooks.js
import { realtimeTransport } from 'svelte-realtime/hooks';
export const transport = realtimeTransport();

init({ platform }) and shutdown({ platform }) hook contracts

Optional hooks.ws.js exports that fire once per worker. See Lifecycle Hooks.


assert(cond, category, context) and getAssertionCounters()

Production assertions. Categories are listed in Architecture -> Production assertions. getAssertionCounters() returns the in-process snapshot for tests.


Capacity caps

ConstantDefaultSaturation behavior
MAX_PRESENCE_REF1,000,000FIFO-evict pending leaves, then drop new joins with one-shot warning.
MAX_PUSH_REGISTRY1,000,000REJECT past cap.
MAX_OPTIMISTIC_QUEUE_DEPTH1,000,000WARN-then-skip.
MAX_AGGREGATE_BUCKETS1,000Module-load validation rejects.
TOPIC_WS_COUNTS_WARN_THRESHOLDtunableWARN-only.
SILENT_TOPIC_WARN_DEDUP_MAXtunableDedup warn cap.
PUBLISH_RATE_WARN_DEDUP_MAXtunableDedup warn cap.

onError(handler)

Global error handler for cron jobs, effects, and derived streams.

import { onError } from 'svelte-realtime/server';

onError((path, error) => {
  sentry.captureException(error, { tags: { live: path } });
});

onCronError(handler) still works but is deprecated - use onError instead.


enableSignals(ws)

Enable point-to-point signal delivery for a WebSocket connection.

export function open(ws, { platform }) {
  enableSignals(ws);
}

_activateDerived(platform)

Enable derived stream listeners. Call once during server startup.


live.metrics(registry)

Opt-in Prometheus metrics. Pass a prom-client registry to expose realtime counters and histograms.


live.breaker(options, fn)

Circuit breaker wrapper. Wraps a function with fail-fast behavior when error rates exceed the threshold.


Client - svelte-realtime/client

$live/* virtual imports

import { myRpc, myStream } from '$live/module';
  • live() exports => async functions
  • live.stream() exports => Svelte stores (or factory functions for dynamic topics)
  • live.validated() exports => async functions
  • live.binary() exports => async functions
  • live.channel() exports => Svelte stores (or factory functions)

Stream store API

Property / MethodDescription
$storeCurrent value (T \| undefined). Errors live on .error, never replace the data value.
store.errorReadable<RpcError \| null>
store.statusReadable<'loading' \| 'connected' \| 'reconnecting' \| 'error'>
store.loadMore()Fetch next page (if pagination enabled)
store.hasMoreboolean - more pages available
store.optimistic(event, data)Apply optimistic update, returns rollback function
store.mutate(asyncOp, optimisticChange)Run an async op with always-on queue replay against un-overlaid server state. See Client -> Optimistic updates for the apply-await-rollback contract.
store.enableHistory(max?)Enable undo/redo tracking
store.undo()Undo last mutation
store.redo()Redo last undone mutation
store.canUndoboolean
store.canRedoboolean
store.hydrate(data)Pre-populate with SSR data
store.when(condition)Conditional subscription
store.load(platform)Server-side data fetch for SSR

Connection state stores

import { status, denials, failure, classifyCloseCode } from 'svelte-realtime/client';
ExportDescription
statusReadable<'connecting' \| 'open' \| 'suspended' \| 'disconnected' \| 'failed'>. ready() resolves on 'open' or 'suspended'.
denialsReadable<{topic, reason, ref} \| null>. Latest subscribe denial.
failureReadable<{kind, code?, status?, reason} \| null>. Discriminated union by kind.
classifyCloseCode(code)Returns 'TERMINAL' \| 'THROTTLE' \| 'RETRY'.

__upload(path) factory

Browser counterpart to live.upload. See Uploads -> Client.

subscribeAt(stream, { schemaVersion })

From svelte-realtime/test-client. Constructs a parallel store at a chosen schema version for migration tests. See Testing.

realtimeTransport

From svelte-realtime/hooks. SvelteKit transport hook for RpcError / LiveError SSR serialization.

batch(fn, options?)

import { batch } from 'svelte-realtime/client';
const results = await batch(() => [rpc1(), rpc2()]);
OptionTypeDefaultDescription
sequentialbooleanfalseRun calls in order

Max 50 calls per batch.

combine(...stores, fn)

import { combine } from 'svelte-realtime/client';
const derived = combine(storeA, storeB, (a, b) => ({ ...a, ...b }));

onSignal(userId, callback)

Listen for point-to-point signals targeted at this user. Counterpart to server-side ctx.signal(userId, event, data):

import { onSignal } from 'svelte-realtime/client';

const unsubscribe = onSignal(currentUser.id, (event, data) => {
  if (event === 'nudge') showNotification(data.from);
});

Returns an unsubscribe function. Signal delivery requires enableSignals(ws) in the server’s open hook.

configure(options)

import { configure } from 'svelte-realtime/client';

configure({
  url: 'wss://api.example.com/ws',
  onConnect() { },
  onDisconnect() { },
  beforeReconnect() { },
  offline: { queue: true, maxQueue: 100, maxAge: 60000 },
  resumeGraceMs: 60_000,
  volatileBackpressureBytes: 4 * 1024 * 1024,
  timeout: 30_000
});
OptionTypeDefaultDescription
urlstringderivedOverride the WebSocket URL
onConnect()functionunsetCalled when the WebSocket opens after a reconnect
onDisconnect()functionunsetCalled when the WebSocket closes
beforeReconnect()functionunsetCalled before each reconnect attempt (can be async)
offlineobjectunsetOffline queue config: { queue, maxQueue, maxAge }
resumeGraceMsnumber60_000Stream resume-grace window (ms). When the last subscriber unsubs, the stream releases its WS subscription but keeps the in-memory data model (currentValue, _lastSeq, cursor, history) for this long so a new subscribe inside the window gap-fills instead of cold-starting. Set to 0 to disable. Added in 0.5.5.
volatileBackpressureBytesnumber4 * 1024 * 1024Backpressure cap for .fireAndForget() calls. When WS.bufferedAmount exceeds this, volatile frames are silently dropped and __devtools.volatileDropped ticks. Added in 0.5.8.
timeoutnumber30_000Default RPC timeout (ms). Per-call .with({ timeout }) overrides.
authbooleanfalseStamp the bundled POST /__ws/auth preflight + CSRF headers before every upgrade. Set when using SvelteKit authenticate({ cookies }) hook for cookie rotation. See Cloudflare cookies.
upload{ frameSize?: number }derivedOverride per-chunk frame size for live.upload. Default auto-derived from platform.maxPayloadLength with 10% headroom.

Call once at app startup.

RpcError

Thrown by failed RPC calls.

PropertyTypeDescription
codestringError code from LiveError
messagestringError message
issuesarrayValidation issues (if code === 'VALIDATION')

onDerived

Re-exported from the adapter. Reactive derived topic subscription - subscribes to a topic derived from a reactive store value, auto-switching when the source changes.

import { onDerived } from 'svelte-realtime/client';

Was this page helpful?