API Reference
Server - svelte-realtime/server
live(fn)
Wrap a function as a WebSocket RPC endpoint.
import { live } from 'svelte-realtime/server';
export const myFunction = live(async (ctx, arg1, arg2) => {
return result;
}); | Parameter | Type | Description |
|---|---|---|
fn | (ctx, ...args) => any | Server function. First arg is always ctx. |
Returns: Callable RPC. On the client, becomes an async function.
live.stream(topic, init, options?)
Create a reactive stream subscription.
export const items = live.stream('items', async (ctx) => {
return db.items.all();
}, { merge: 'crud', key: 'id' }); | Parameter | Type | Description |
|---|---|---|
topic | string \| (ctx, ...args) => string | Topic name, or factory for dynamic topics |
init | (ctx, ...args) => any | Returns initial data for new subscribers |
options | StreamOptions | See below |
StreamOptions:
| Option | Type | Default | Description |
|---|---|---|---|
merge | 'crud' \| 'set' \| 'latest' \| 'presence' \| 'cursor' | 'crud' | Merge strategy |
key | string | 'id' | Key field for crud merge |
prepend | boolean | false | Prepend new items (crud) |
max | number | 50 | Max items (latest) |
replay | boolean | false | Seq-based replay |
onSubscribe | (ctx, topic) => void | - | Fires when a client subscribes |
onUnsubscribe | (ctx, topic) => void | - | Fires when a client unsubscribes |
access | (ctx) => boolean | - | Subscription access control |
filter | (ctx, event, data) => boolean | - | Per-event publish filter |
delta | DeltaConfig | - | Delta sync config |
version | number | - | Schema version |
migrate | Record<number, fn> | - | Migration functions |
Returns: On the client, a Svelte store (or store factory for dynamic topics).
live.validated(schema, fn)
Validate the first argument against a Zod or Valibot schema.
import { z } from 'zod';
const Input = z.object({ text: z.string().min(1) });
export const create = live.validated(Input, async (ctx, input) => { ... }); | Parameter | Type | Description |
|---|---|---|
schema | ZodSchema \| ValibotSchema | Validation schema |
fn | (ctx, validatedInput, ...args) => any | Handler receives validated data |
Errors: Throws RpcError with code: 'VALIDATION' and issues array.
live.cron(schedule, fn) / live.cron(schedule, topic, fn)
Scheduled server-side task.
export const cleanup = live.cron('*/5 * * * *', async (ctx) => { ... });
export const refresh = live.cron('*/30 * * * * *', 'stats', async () => stats); | Parameter | Type | Description |
|---|---|---|
schedule | string | Cron expression: minute hour day month weekday |
topic | string | (Optional) Auto-publish return value as set event |
fn | (ctx) => any | Task function. ctx has publish, throttle, debounce, signal but no user or ws. |
live.binary(fn)
Handle ArrayBuffer payloads.
export const upload = live.binary(async (ctx, buffer) => { ... }); Binary frames bypass JSON serialization.
live.channel(topic, options?)
Ephemeral pub/sub - no database init function.
export const typing = live.channel('typing:lobby', { merge: 'presence' });
export const cursors = live.channel((ctx, docId) => 'cursors:' + docId, { merge: 'cursor' }); live.derived(topics, fn, options?)
Server-side computed stream that recomputes when source topics publish.
export const stats = live.derived(['orders', 'inventory'], async () => {
return { total: await db.orders.count() };
}, { debounce: 500 }); live.rateLimit(config, fn)
Per-function rate limiting.
export const send = live.rateLimit({ points: 5, window: 10000 }, async (ctx, text) => { ... }); | Option | Type | Description |
|---|---|---|
points | number | Max calls per window |
window | number | Window size in ms |
live.middleware(fn)
Global middleware. Runs before per-module guards on every call.
live.middleware(async (ctx, next) => {
if (!ctx.user) throw new LiveError('UNAUTHORIZED');
return next();
}); guard(...fns)
Per-module auth guard. Export as _guard.
import { guard, LiveError } from 'svelte-realtime/server';
export const _guard = guard(
(ctx) => { if (!ctx.user) throw new LiveError('UNAUTHORIZED'); }
); LiveError(code, message?)
Structured error class.
throw new LiveError('NOT_FOUND', 'Item does not exist'); | Parameter | Type | Description |
|---|---|---|
code | string | Machine-readable error code |
message | string | Human-readable message |
createMessage(options?)
Create the WebSocket message handler for hooks.ws.js.
export const message = createMessage({
platform: (p) => wrappedPlatform,
async beforeExecute(ws, rpcPath) { ... }
}); | Option | Type | Description |
|---|---|---|
platform | (platform) => platform | Transform the platform object |
beforeExecute | (ws, rpcPath) => void | Runs before every RPC. Throw to reject. |
The ctx object
| Property | Type | Description |
|---|---|---|
ctx.user | object | Data returned by upgrade() |
ctx.ws | WebSocket | Raw WebSocket connection |
ctx.platform | object | Adapter platform API |
ctx.cursor | string \| null | Cursor from loadMore() |
| Method | Signature | Description |
|---|---|---|
ctx.publish | (topic, event, data) => void | Broadcast to all subscribers |
ctx.throttle | (topic, event, data, ms) => void | Rate-limited publish |
ctx.debounce | (topic, event, data, ms) => void | Debounced publish |
ctx.signal | (userId, event, data) => void | Point-to-point message |
ctx.batch | (messages[]) => void | Bulk publish |
pipe(stream, ...transforms)
Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.
import { live, pipe } from 'svelte-realtime/server';
export const myNotifications = pipe(
live.stream('notifications', async (ctx) => {
return db.notifications.forUser(ctx.user.id);
}, { merge: 'crud', key: 'id' }),
pipe.filter((ctx, item) => !item.dismissed),
pipe.sort('createdAt', 'desc'),
pipe.limit(20),
pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
); | Transform | Initial data | Live events |
|---|---|---|
pipe.filter(predicate) | Filters the array | Initial data only |
pipe.sort(field, dir) | Sorts the array | Initial data only |
pipe.limit(n) | Slices to N items | Initial data only |
pipe.join(field, resolver, as) | Enriches each item | Initial data only |
Piped functions preserve all stream metadata. The client receives already-transformed data.
close
Ready-made close hook for hooks.ws.js. Fires onUnsubscribe for all remaining topics when a connection closes.
export { close } from 'svelte-realtime/server'; unsubscribe
Ready-made unsubscribe hook for hooks.ws.js. Fires onUnsubscribe in real time as each topic is released.
export { unsubscribe } from 'svelte-realtime/server'; setCronPlatform(platform)
Capture the adapter platform so cron jobs can publish. Call once in your hooks.server.js.
import { setCronPlatform } from 'svelte-realtime/server';
export async function handle({ event, resolve }) {
setCronPlatform(event.platform);
return resolve(event);
} onError(handler)
Global error handler for cron jobs, effects, and derived streams.
import { onError } from 'svelte-realtime/server';
onError((path, error) => {
sentry.captureException(error, { tags: { live: path } });
});
onCronError(handler)still works but is deprecated - useonErrorinstead.
enableSignals(ws)
Enable point-to-point signal delivery for a WebSocket connection.
export function open(ws, { platform }) {
enableSignals(ws);
} _activateDerived(platform)
Enable derived stream listeners. Call once during server startup.
live.metrics(registry)
Opt-in Prometheus metrics. Pass a prom-client registry to expose realtime counters and histograms.
live.breaker(options, fn)
Circuit breaker wrapper. Wraps a function with fail-fast behavior when error rates exceed the threshold.
Client - svelte-realtime/client
$live/* virtual imports
import { myRpc, myStream } from '$live/module'; live()exports => async functionslive.stream()exports => Svelte stores (or factory functions for dynamic topics)live.validated()exports => async functionslive.binary()exports => async functionslive.channel()exports => Svelte stores (or factory functions)
Stream store API
| Property / Method | Description |
|---|---|
$store | Current value (undefined → data → { error }) |
store.loadMore() | Fetch next page (if pagination enabled) |
store.hasMore | boolean - more pages available |
store.optimistic(event, data) | Apply optimistic update, returns rollback function |
store.enableHistory(max?) | Enable undo/redo tracking |
store.undo() | Undo last mutation |
store.redo() | Redo last undone mutation |
store.canUndo | boolean |
store.canRedo | boolean |
store.hydrate(data) | Pre-populate with SSR data |
store.when(condition) | Conditional subscription |
store.load(platform) | Server-side data fetch for SSR |
batch(fn, options?)
import { batch } from 'svelte-realtime/client';
const results = await batch(() => [rpc1(), rpc2()]); | Option | Type | Default | Description |
|---|---|---|---|
sequential | boolean | false | Run calls in order |
Max 50 calls per batch.
combine(...stores, fn)
import { combine } from 'svelte-realtime/client';
const derived = combine(storeA, storeB, (a, b) => ({ ...a, ...b })); configure(options)
import { configure } from 'svelte-realtime/client';
configure({
url: 'wss://api.example.com/ws',
onConnect() { },
onDisconnect() { },
beforeReconnect() { },
offline: { queue: true, maxQueue: 100, maxAge: 60000 }
}); Call once at app startup.
RpcError
Thrown by failed RPC calls.
| Property | Type | Description |
|---|---|---|
code | string | Error code from LiveError |
message | string | Error message |
issues | array | Validation issues (if code === 'VALIDATION') |
onDerived
Re-exported from the adapter. Reactive derived topic subscription - subscribes to a topic derived from a reactive store value, auto-switching when the source changes.
import { onDerived } from 'svelte-realtime/client'; Was this page helpful?