API Reference

Server - svelte-realtime/server

live(fn)

Wrap a function as a WebSocket RPC endpoint.

import { live } from 'svelte-realtime/server';

export const myFunction = live(async (ctx, arg1, arg2) => {
  return result;
});
ParameterTypeDescription
fn(ctx, ...args) => anyServer function. First arg is always ctx.

Returns: Callable RPC. On the client, becomes an async function.


live.stream(topic, init, options?)

Create a reactive stream subscription.

export const items = live.stream('items', async (ctx) => {
  return db.items.all();
}, { merge: 'crud', key: 'id' });
ParameterTypeDescription
topicstring \| (ctx, ...args) => stringTopic name, or factory for dynamic topics
init(ctx, ...args) => anyReturns initial data for new subscribers
optionsStreamOptionsSee below

StreamOptions:

OptionTypeDefaultDescription
merge'crud' \| 'set' \| 'latest' \| 'presence' \| 'cursor''crud'Merge strategy
keystring'id'Key field for crud merge
prependbooleanfalsePrepend new items (crud)
maxnumber50Max items (latest)
replaybooleanfalseSeq-based replay
onSubscribe(ctx, topic) => void-Fires when a client subscribes
onUnsubscribe(ctx, topic) => void-Fires when a client unsubscribes
access(ctx) => boolean-Subscription access control
filter(ctx, event, data) => boolean-Per-event publish filter
deltaDeltaConfig-Delta sync config
versionnumber-Schema version
migrateRecord<number, fn>-Migration functions

Returns: On the client, a Svelte store (or store factory for dynamic topics).


live.validated(schema, fn)

Validate the first argument against a Zod or Valibot schema.

import { z } from 'zod';

const Input = z.object({ text: z.string().min(1) });
export const create = live.validated(Input, async (ctx, input) => { ... });
ParameterTypeDescription
schemaZodSchema \| ValibotSchemaValidation schema
fn(ctx, validatedInput, ...args) => anyHandler receives validated data

Errors: Throws RpcError with code: 'VALIDATION' and issues array.


live.cron(schedule, fn) / live.cron(schedule, topic, fn)

Scheduled server-side task.

export const cleanup = live.cron('*/5 * * * *', async (ctx) => { ... });
export const refresh = live.cron('*/30 * * * * *', 'stats', async () => stats);
ParameterTypeDescription
schedulestringCron expression: minute hour day month weekday
topicstring(Optional) Auto-publish return value as set event
fn(ctx) => anyTask function. ctx has publish, throttle, debounce, signal but no user or ws.

live.binary(fn)

Handle ArrayBuffer payloads.

export const upload = live.binary(async (ctx, buffer) => { ... });

Binary frames bypass JSON serialization.


live.channel(topic, options?)

Ephemeral pub/sub - no database init function.

export const typing = live.channel('typing:lobby', { merge: 'presence' });
export const cursors = live.channel((ctx, docId) => 'cursors:' + docId, { merge: 'cursor' });

live.derived(topics, fn, options?)

Server-side computed stream that recomputes when source topics publish.

export const stats = live.derived(['orders', 'inventory'], async () => {
  return { total: await db.orders.count() };
}, { debounce: 500 });

live.rateLimit(config, fn)

Per-function rate limiting.

export const send = live.rateLimit({ points: 5, window: 10000 }, async (ctx, text) => { ... });
OptionTypeDescription
pointsnumberMax calls per window
windownumberWindow size in ms

live.middleware(fn)

Global middleware. Runs before per-module guards on every call.

live.middleware(async (ctx, next) => {
  if (!ctx.user) throw new LiveError('UNAUTHORIZED');
  return next();
});

guard(...fns)

Per-module auth guard. Export as _guard.

import { guard, LiveError } from 'svelte-realtime/server';

export const _guard = guard(
  (ctx) => { if (!ctx.user) throw new LiveError('UNAUTHORIZED'); }
);

LiveError(code, message?)

Structured error class.

throw new LiveError('NOT_FOUND', 'Item does not exist');
ParameterTypeDescription
codestringMachine-readable error code
messagestringHuman-readable message

createMessage(options?)

Create the WebSocket message handler for hooks.ws.js.

export const message = createMessage({
  platform: (p) => wrappedPlatform,
  async beforeExecute(ws, rpcPath) { ... }
});
OptionTypeDescription
platform(platform) => platformTransform the platform object
beforeExecute(ws, rpcPath) => voidRuns before every RPC. Throw to reject.

The ctx object

PropertyTypeDescription
ctx.userobjectData returned by upgrade()
ctx.wsWebSocketRaw WebSocket connection
ctx.platformobjectAdapter platform API
ctx.cursorstring \| nullCursor from loadMore()
MethodSignatureDescription
ctx.publish(topic, event, data) => voidBroadcast to all subscribers
ctx.throttle(topic, event, data, ms) => voidRate-limited publish
ctx.debounce(topic, event, data, ms) => voidDebounced publish
ctx.signal(userId, event, data) => voidPoint-to-point message
ctx.batch(messages[]) => voidBulk publish

Client - svelte-realtime/client

$live/* virtual imports

import { myRpc, myStream } from '$live/module';
  • live() exports => async functions
  • live.stream() exports => Svelte stores (or factory functions for dynamic topics)
  • live.validated() exports => async functions
  • live.binary() exports => async functions
  • live.channel() exports => Svelte stores (or factory functions)

Stream store API

Property / MethodDescription
$storeCurrent value (undefined → data → { error })
store.loadMore()Fetch next page (if pagination enabled)
store.hasMoreboolean - more pages available
store.optimistic(event, data)Apply optimistic update, returns rollback function
store.enableHistory(max?)Enable undo/redo tracking
store.undo()Undo last mutation
store.redo()Redo last undone mutation
store.canUndoboolean
store.canRedoboolean
store.hydrate(data)Pre-populate with SSR data
store.load(platform)Server-side data fetch for SSR

batch(fn, options?)

import { batch } from 'svelte-realtime/client';
const results = await batch(() => [rpc1(), rpc2()]);
OptionTypeDefaultDescription
sequentialbooleanfalseRun calls in order

Max 50 calls per batch.

combine(...stores, fn)

import { combine } from 'svelte-realtime/client';
const derived = combine(storeA, storeB, (a, b) => ({ ...a, ...b }));

configure(options)

import { configure } from 'svelte-realtime/client';

configure({
  onConnect() { },
  onDisconnect() { },
  beforeReconnect() { },
  offline: { queue: true, maxQueue: 100, maxAge: 60000 }
});

Call once at app startup.

RpcError

Thrown by failed RPC calls.

PropertyTypeDescription
codestringError code from LiveError
messagestringError message
issuesarrayValidation issues (if code === 'VALIDATION')

Was this page helpful?