API Reference

Server - svelte-realtime/server

live(fn)

Wrap a function as a WebSocket RPC endpoint.

import { live } from 'svelte-realtime/server';

export const myFunction = live(async (ctx, arg1, arg2) => {
  return result;
});
ParameterTypeDescription
fn(ctx, ...args) => anyServer function. First arg is always ctx.

Returns: Callable RPC. On the client, becomes an async function.


live.stream(topic, init, options?)

Create a reactive stream subscription.

export const items = live.stream('items', async (ctx) => {
  return db.items.all();
}, { merge: 'crud', key: 'id' });
ParameterTypeDescription
topicstring \| (ctx, ...args) => stringTopic name, or factory for dynamic topics
init(ctx, ...args) => anyReturns initial data for new subscribers
optionsStreamOptionsSee below

StreamOptions:

OptionTypeDefaultDescription
merge'crud' \| 'set' \| 'latest' \| 'presence' \| 'cursor''crud'Merge strategy
keystring'id'Key field for crud merge
prependbooleanfalsePrepend new items (crud)
maxnumber50Max items (latest)
replaybooleanfalseSeq-based replay
onSubscribe(ctx, topic) => void-Fires when a client subscribes
onUnsubscribe(ctx, topic) => void-Fires when a client unsubscribes
access(ctx) => boolean-Subscription access control
filter(ctx, event, data) => boolean-Per-event publish filter
deltaDeltaConfig-Delta sync config
versionnumber-Schema version
migrateRecord<number, fn>-Migration functions

Returns: On the client, a Svelte store (or store factory for dynamic topics).


live.validated(schema, fn)

Validate the first argument against a Zod or Valibot schema.

import { z } from 'zod';

const Input = z.object({ text: z.string().min(1) });
export const create = live.validated(Input, async (ctx, input) => { ... });
ParameterTypeDescription
schemaZodSchema \| ValibotSchemaValidation schema
fn(ctx, validatedInput, ...args) => anyHandler receives validated data

Errors: Throws RpcError with code: 'VALIDATION' and issues array.


live.cron(schedule, fn) / live.cron(schedule, topic, fn)

Scheduled server-side task.

export const cleanup = live.cron('*/5 * * * *', async (ctx) => { ... });
export const refresh = live.cron('*/30 * * * * *', 'stats', async () => stats);
ParameterTypeDescription
schedulestringCron expression: minute hour day month weekday
topicstring(Optional) Auto-publish return value as set event
fn(ctx) => anyTask function. ctx has publish, throttle, debounce, signal but no user or ws.

live.binary(fn)

Handle ArrayBuffer payloads.

export const upload = live.binary(async (ctx, buffer) => { ... });

Binary frames bypass JSON serialization.


live.channel(topic, options?)

Ephemeral pub/sub - no database init function.

export const typing = live.channel('typing:lobby', { merge: 'presence' });
export const cursors = live.channel((ctx, docId) => 'cursors:' + docId, { merge: 'cursor' });

live.derived(topics, fn, options?)

Server-side computed stream that recomputes when source topics publish.

export const stats = live.derived(['orders', 'inventory'], async () => {
  return { total: await db.orders.count() };
}, { debounce: 500 });

live.rateLimit(config, fn)

Per-function rate limiting.

export const send = live.rateLimit({ points: 5, window: 10000 }, async (ctx, text) => { ... });
OptionTypeDescription
pointsnumberMax calls per window
windownumberWindow size in ms

live.middleware(fn)

Global middleware. Runs before per-module guards on every call.

live.middleware(async (ctx, next) => {
  if (!ctx.user) throw new LiveError('UNAUTHORIZED');
  return next();
});

guard(...fns)

Per-module auth guard. Export as _guard.

import { guard, LiveError } from 'svelte-realtime/server';

export const _guard = guard(
  (ctx) => { if (!ctx.user) throw new LiveError('UNAUTHORIZED'); }
);

LiveError(code, message?)

Structured error class.

throw new LiveError('NOT_FOUND', 'Item does not exist');
ParameterTypeDescription
codestringMachine-readable error code
messagestringHuman-readable message

createMessage(options?)

Create the WebSocket message handler for hooks.ws.js.

export const message = createMessage({
  platform: (p) => wrappedPlatform,
  async beforeExecute(ws, rpcPath) { ... }
});
OptionTypeDescription
platform(platform) => platformTransform the platform object
beforeExecute(ws, rpcPath) => voidRuns before every RPC. Throw to reject.

The ctx object

PropertyTypeDescription
ctx.userobjectData returned by upgrade()
ctx.wsWebSocketRaw WebSocket connection
ctx.platformobjectAdapter platform API
ctx.cursorstring \| nullCursor from loadMore()
MethodSignatureDescription
ctx.publish(topic, event, data) => voidBroadcast to all subscribers
ctx.throttle(topic, event, data, ms) => voidRate-limited publish
ctx.debounce(topic, event, data, ms) => voidDebounced publish
ctx.signal(userId, event, data) => voidPoint-to-point message
ctx.batch(messages[]) => voidBulk publish

pipe(stream, ...transforms)

Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.

import { live, pipe } from 'svelte-realtime/server';

export const myNotifications = pipe(
  live.stream('notifications', async (ctx) => {
    return db.notifications.forUser(ctx.user.id);
  }, { merge: 'crud', key: 'id' }),

  pipe.filter((ctx, item) => !item.dismissed),
  pipe.sort('createdAt', 'desc'),
  pipe.limit(20),
  pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
);
TransformInitial dataLive events
pipe.filter(predicate)Filters the arrayInitial data only
pipe.sort(field, dir)Sorts the arrayInitial data only
pipe.limit(n)Slices to N itemsInitial data only
pipe.join(field, resolver, as)Enriches each itemInitial data only

Piped functions preserve all stream metadata. The client receives already-transformed data.


close

Ready-made close hook for hooks.ws.js. Fires onUnsubscribe for all remaining topics when a connection closes.

export { close } from 'svelte-realtime/server';

unsubscribe

Ready-made unsubscribe hook for hooks.ws.js. Fires onUnsubscribe in real time as each topic is released.

export { unsubscribe } from 'svelte-realtime/server';

setCronPlatform(platform)

Capture the adapter platform so cron jobs can publish. Call once in your hooks.server.js.

import { setCronPlatform } from 'svelte-realtime/server';

export async function handle({ event, resolve }) {
  setCronPlatform(event.platform);
  return resolve(event);
}

onError(handler)

Global error handler for cron jobs, effects, and derived streams.

import { onError } from 'svelte-realtime/server';

onError((path, error) => {
  sentry.captureException(error, { tags: { live: path } });
});

onCronError(handler) still works but is deprecated - use onError instead.


enableSignals(ws)

Enable point-to-point signal delivery for a WebSocket connection.

export function open(ws, { platform }) {
  enableSignals(ws);
}

_activateDerived(platform)

Enable derived stream listeners. Call once during server startup.


live.metrics(registry)

Opt-in Prometheus metrics. Pass a prom-client registry to expose realtime counters and histograms.


live.breaker(options, fn)

Circuit breaker wrapper. Wraps a function with fail-fast behavior when error rates exceed the threshold.


Client - svelte-realtime/client

$live/* virtual imports

import { myRpc, myStream } from '$live/module';
  • live() exports => async functions
  • live.stream() exports => Svelte stores (or factory functions for dynamic topics)
  • live.validated() exports => async functions
  • live.binary() exports => async functions
  • live.channel() exports => Svelte stores (or factory functions)

Stream store API

Property / MethodDescription
$storeCurrent value (undefined → data → { error })
store.loadMore()Fetch next page (if pagination enabled)
store.hasMoreboolean - more pages available
store.optimistic(event, data)Apply optimistic update, returns rollback function
store.enableHistory(max?)Enable undo/redo tracking
store.undo()Undo last mutation
store.redo()Redo last undone mutation
store.canUndoboolean
store.canRedoboolean
store.hydrate(data)Pre-populate with SSR data
store.when(condition)Conditional subscription
store.load(platform)Server-side data fetch for SSR

batch(fn, options?)

import { batch } from 'svelte-realtime/client';
const results = await batch(() => [rpc1(), rpc2()]);
OptionTypeDefaultDescription
sequentialbooleanfalseRun calls in order

Max 50 calls per batch.

combine(...stores, fn)

import { combine } from 'svelte-realtime/client';
const derived = combine(storeA, storeB, (a, b) => ({ ...a, ...b }));

configure(options)

import { configure } from 'svelte-realtime/client';

configure({
  url: 'wss://api.example.com/ws',
  onConnect() { },
  onDisconnect() { },
  beforeReconnect() { },
  offline: { queue: true, maxQueue: 100, maxAge: 60000 }
});

Call once at app startup.

RpcError

Thrown by failed RPC calls.

PropertyTypeDescription
codestringError code from LiveError
messagestringError message
issuesarrayValidation issues (if code === 'VALIDATION')

onDerived

Re-exported from the adapter. Reactive derived topic subscription - subscribes to a topic derived from a reactive store value, auto-switching when the source changes.

import { onDerived } from 'svelte-realtime/client';

Was this page helpful?