The ctx Object
Every live() function receives ctx as its first argument. It contains the connection, user data, and publishing utilities.
Properties
| Field | Type | Description |
|---|---|---|
ctx.user | object | Whatever your upgrade() returned |
ctx.ws | WebSocket | The raw WebSocket connection |
ctx.platform | object | The adapter platform API |
ctx.cursor | string \| null | Cursor from a loadMore() call |
Methods
ctx.publish(topic, event, data)
Send an event to all clients subscribed to topic.
ctx.publish('messages', 'created', { id: 1, text: 'hello' }); System topics.
ctx.publish()rejects topics starting with__(the framework-internal prefix used bylive.signal, presence, replay, and other plugins) withLiveError('INVALID_TOPIC'). Usectx.platform.publish(...)directly when you legitimately need to broadcast on a system topic. See Migration 0.4 to 0.5.
ctx.publishThrottled(topic, event, data, ms)
Publish at most once per ms milliseconds. First call goes immediately, then stores subsequent calls and sends the last value when the interval expires.
export const updatePosition = live(async (ctx, x, y) => {
ctx.publishThrottled('cursors', 'update', { key: ctx.user.id, x, y }, 50);
}); Renamed in 0.5.9. The legacy names
ctx.throttle/ctx.debouncestill work as soft-deprecated aliases (one-time dev warn per process per name) but the canonical names arectx.publishThrottled/ctx.publishDebounced. The rename makes “publish” central to the name so the helpers cannot be misread as handler-execution gates. For handler gating, seectx.skip(key, ms)below.
ctx.publishDebounced(topic, event, data, ms)
Wait for ms milliseconds of silence before publishing. Each call resets the timer.
export const saveSearch = live(async (ctx, query) => {
ctx.publishDebounced('search:' + ctx.user.id, 'set', { query }, 300);
}); ctx.skip(key, ms)
Per-key handler gate. Returns true if the key is still within its cooldown window (caller should early-return); false if the call should run. Available on LiveContext only (not CronContext).
export const moveNote = live(async (ctx, noteId, x, y) => {
if (ctx.skip(`move:${noteId}`, 16)) return; // drop calls within 16ms per note
await dbUpdateNote(noteId, x, y);
ctx.publish('notes', 'updated', { noteId, x, y });
}); State is per-replica - this is a CPU/DB shed, not a cluster-wide rate limit. For cluster-wide gating use live.rateLimit({ store: 'redis' }) or the redis/ratelimit extension.
Capped at 5000 active entries with fail-open semantics on overflow (returns false, dev-warns once) so a runaway dynamic-key generator cannot silently start blocking legitimate calls. Throws LiveError('INVALID_ARG', ...) on key not a string or ms not a positive finite number.
Pairs semantically with ctx.shed(...) - both return true to signal early-return - so call sites read uniformly:
export const expensive = live(async (ctx, id) => {
if (ctx.shed('background')) return; // global pressure shed
if (ctx.skip(`expensive:${id}`, 250)) return; // per-key handler gate
await doWork(id);
}); This is the primitive developers were reaching for when they wrote ctx.throttle('move:id', 50) thinking it gated handler execution. The old ctx.throttle / ctx.debounce are outbound publish helpers (now renamed to publishThrottled / publishDebounced); ctx.skip is the actual handler gate.
ctx.signal(userId, event, data)
Point-to-point message to a specific user. Only that user receives it.
export const nudge = live(async (ctx, targetUserId) => {
ctx.signal(targetUserId, 'nudge', { from: ctx.user.name });
}); ctx.batch(messages)
Publish multiple messages in a single call:
export const reset = live(async (ctx, boardId) => {
ctx.batch([
{ topic: `board:${boardId}`, event: 'set', data: [] },
{ topic: `board:${boardId}:presence`, event: 'set', data: [] }
]);
}); Rate limiting
Per-function rate limiting
Use live.rateLimit() to apply a sliding window rate limiter to a single function. If a client exceeds the limit, the call throws a rate-limit error.
export const sendMessage = live.rateLimit({ points: 5, window: 10000 }, async (ctx, text) => {
const msg = await db.messages.insert({ userId: ctx.user.id, text });
ctx.publish('messages', 'created', msg);
return msg;
}); The config object takes two fields:
| Option | Description |
|---|---|
points | Maximum number of calls allowed within the window |
window | Sliding window size in milliseconds |
In this example, each client can call sendMessage at most 5 times per 10 seconds. The rate limiter tracks calls per WebSocket connection → different users have independent limits.
Global rate limiting with Redis
For distributed rate limiting across all instances, use createRateLimit from the extensions package with the beforeExecute hook:
import { createMessage, LiveError } from 'svelte-realtime/server';
import { createRedisClient } from 'svelte-adapter-uws-extensions/redis';
import { createRateLimit } from 'svelte-adapter-uws-extensions/redis/ratelimit';
const redis = createRedisClient();
const limiter = createRateLimit(redis, { points: 30, interval: 10000 });
export const message = createMessage({
async beforeExecute(ws, rpcPath) {
const { allowed, resetMs } = await limiter.consume(ws);
if (!allowed)
throw new LiveError('RATE_LIMITED', `Retry in ${Math.ceil(resetMs / 1000)}s`);
}
}); beforeExecute runs before every RPC handler. If it throws, the RPC is rejected without executing. This gives you a single enforcement point for all live functions.
Circuit breaker
Wrap stream init functions with live.breaker() to fail fast when a backend (database, external API) is unhealthy:
import { live } from 'svelte-realtime/server';
const dbBreaker = createBreaker({ threshold: 5, resetMs: 30000 });
export const items = live.stream('items',
live.breaker({ breaker: dbBreaker, fallback: [] }, async (ctx) => {
return db.items.list();
})
); | Option | Description |
|---|---|
breaker | A breaker instance created with createBreaker() |
fallback | Value returned when the breaker is open (prevents errors from reaching clients) |
threshold | Consecutive failures before the breaker trips (default: 5) |
resetMs | Milliseconds before the breaker probes for recovery (default: 30000) |
When the breaker trips, the stream returns the fallback value instead of calling the init function. Once the probe succeeds, normal operation resumes.
Prometheus metrics
Opt-in metrics for RPC calls, streams, and cron jobs:
import { createMetrics } from 'svelte-adapter-uws-extensions/prometheus';
import { live } from 'svelte-realtime/server';
const metrics = createMetrics({ prefix: 'myapp_' });
live.metrics(metrics); Available metrics:
| Metric | Type | Description |
|---|---|---|
svelte_realtime_rpc_total | counter | Total RPC calls |
svelte_realtime_rpc_duration_seconds | histogram | RPC execution time |
svelte_realtime_rpc_errors_total | counter | RPC errors |
svelte_realtime_stream_subscriptions | gauge | Active stream subscriptions |
svelte_realtime_cron_total | counter | Cron job executions |
svelte_realtime_cron_errors_total | counter | Cron job errors |
Webhooks
Bridge external HTTP webhooks into your pub/sub topics using live.webhook(). This turns incoming HTTP requests (from Stripe, GitHub, etc.) into stream events.
// src/live/integrations.js
import { live } from 'svelte-realtime/server';
export const stripeEvents = live.webhook('payments', {
verify({ body, headers }) {
return stripe.webhooks.constructEvent(body, headers['stripe-signature'], webhookSecret);
},
transform(event) {
if (event.type === 'payment_intent.succeeded') {
return { event: 'created', data: event.data.object };
}
return null; // ignore other event types
}
}); Use the handler in a SvelteKit endpoint:
// src/routes/api/stripe/+server.js
import { stripeEvents } from '$live/integrations';
export async function POST({ request, platform }) {
const body = await request.text();
const headers = Object.fromEntries(request.headers);
const result = await stripeEvents.handle({ body, headers, platform });
return new Response(result.body, { status: result.status });
} The flow is: HTTP request → verify() (authenticate the webhook) → transform() (map to a stream event) → ctx.publish() under the hood. If transform returns null, the event is silently ignored.
Signals
Point-to-point ephemeral messaging. Send a signal to a specific user without broadcasting to a topic. Useful for WebRTC signaling, notifications, direct nudges, and any case where only one user should receive the message.
Sending signals (server)
Use ctx.signal(userId, event, data) from any live() function:
// Server: send a signal
const handler = live(async (ctx, targetUserId, offer) => {
ctx.signal(targetUserId, 'call:offer', offer);
}); Receiving signals (client)
Use onSignal from svelte-realtime/client to listen for incoming signals:
// Client: receive signals
import { onSignal } from 'svelte-realtime/client';
const unsub = onSignal(currentUser.id, (event, data) => {
if (event === 'call:offer') showIncomingCall(data);
}); Enabling signal delivery
Enable signal delivery in your open hook:
import { enableSignals } from 'svelte-realtime/server';
export function open(ws) { enableSignals(ws); } live.notify(target, event, data) is the fire-and-forget counterpart to live.push. Returns Promise<void>, never rejects in normal operation, and validation throws synchronously for programming errors. Use live.push when you need a reply, live.notify when you do not.
Signals are ephemeral - they are not stored or replayed. If the target user is offline, the signal is lost.
Under the hood, ctx.publish and ctx.platform delegate to the adapter’s platform API. See svelte-adapter-uws for the full platform reference.
Was this page helpful?