The ctx Object

Every live() function receives ctx as its first argument. It contains the connection, user data, and publishing utilities.

Properties

FieldTypeDescription
ctx.userobjectWhatever your upgrade() returned
ctx.wsWebSocketThe raw WebSocket connection
ctx.platformobjectThe adapter platform API
ctx.cursorstring \| nullCursor from a loadMore() call

Methods

ctx.publish(topic, event, data)

Send an event to all clients subscribed to topic.

ctx.publish('messages', 'created', { id: 1, text: 'hello' });

ctx.throttle(topic, event, data, ms)

Publish at most once per ms milliseconds. First call goes immediately, then stores subsequent calls and sends the last value when the interval expires.

export const updatePosition = live(async (ctx, x, y) => {
  ctx.throttle('cursors', 'update', { key: ctx.user.id, x, y }, 50);
});

ctx.debounce(topic, event, data, ms)

Wait for ms milliseconds of silence before publishing. Each call resets the timer.

export const saveSearch = live(async (ctx, query) => {
  ctx.debounce('search:' + ctx.user.id, 'set', { query }, 300);
});

ctx.signal(userId, event, data)

Point-to-point message to a specific user. Only that user receives it.

export const nudge = live(async (ctx, targetUserId) => {
  ctx.signal(targetUserId, 'nudge', { from: ctx.user.name });
});

ctx.batch(messages)

Publish multiple messages in a single call:

export const reset = live(async (ctx, boardId) => {
  ctx.batch([
    { topic: `board:${boardId}`, event: 'set', data: [] },
    { topic: `board:${boardId}:presence`, event: 'set', data: [] }
  ]);
});

Rate limiting

Per-function rate limiting

Use live.rateLimit() to apply a sliding window rate limiter to a single function. If a client exceeds the limit, the call throws a rate-limit error.

export const sendMessage = live.rateLimit({ points: 5, window: 10000 }, async (ctx, text) => {
  const msg = await db.messages.insert({ userId: ctx.user.id, text });
  ctx.publish('messages', 'created', msg);
  return msg;
});

The config object takes two fields:

OptionDescription
pointsMaximum number of calls allowed within the window
windowSliding window size in milliseconds

In this example, each client can call sendMessage at most 5 times per 10 seconds. The rate limiter tracks calls per WebSocket connection → different users have independent limits.

Global rate limiting with Redis

For distributed rate limiting across all instances, use createRateLimit from the extensions package with the beforeExecute hook:

import { createMessage, LiveError } from 'svelte-realtime/server';
import { createRedisClient } from 'svelte-adapter-uws-extensions/redis';
import { createRateLimit } from 'svelte-adapter-uws-extensions/redis/ratelimit';

const redis = createRedisClient();
const limiter = createRateLimit(redis, { points: 30, interval: 10000 });

export const message = createMessage({
  async beforeExecute(ws, rpcPath) {
    const { allowed, resetMs } = await limiter.consume(ws);
    if (!allowed)
      throw new LiveError('RATE_LIMITED', `Retry in ${Math.ceil(resetMs / 1000)}s`);
  }
});

beforeExecute runs before every RPC handler. If it throws, the RPC is rejected without executing. This gives you a single enforcement point for all live functions.


Circuit breaker

Wrap stream init functions with live.breaker() to fail fast when a backend (database, external API) is unhealthy:

import { live } from 'svelte-realtime/server';

const dbBreaker = createBreaker({ threshold: 5, resetMs: 30000 });

export const items = live.stream('items',
  live.breaker({ breaker: dbBreaker, fallback: [] }, async (ctx) => {
    return db.items.list();
  })
);
OptionDescription
breakerA breaker instance created with createBreaker()
fallbackValue returned when the breaker is open (prevents errors from reaching clients)
thresholdConsecutive failures before the breaker trips (default: 5)
resetMsMilliseconds before the breaker probes for recovery (default: 30000)

When the breaker trips, the stream returns the fallback value instead of calling the init function. Once the probe succeeds, normal operation resumes.


Prometheus metrics

Opt-in metrics for RPC calls, streams, and cron jobs:

import { createMetrics } from 'svelte-adapter-uws-extensions/prometheus';
import { live } from 'svelte-realtime/server';

const metrics = createMetrics({ prefix: 'myapp_' });
live.metrics(metrics);

Available metrics:

MetricTypeDescription
svelte_realtime_rpc_totalcounterTotal RPC calls
svelte_realtime_rpc_duration_secondshistogramRPC execution time
svelte_realtime_rpc_errors_totalcounterRPC errors
svelte_realtime_stream_subscriptionsgaugeActive stream subscriptions
svelte_realtime_cron_totalcounterCron job executions
svelte_realtime_cron_errors_totalcounterCron job errors

Webhooks

Bridge external HTTP webhooks into your pub/sub topics using live.webhook(). This turns incoming HTTP requests (from Stripe, GitHub, etc.) into stream events.

// src/live/integrations.js
import { live } from 'svelte-realtime/server';

export const stripeEvents = live.webhook('payments', {
  verify({ body, headers }) {
    return stripe.webhooks.constructEvent(body, headers['stripe-signature'], webhookSecret);
  },
  transform(event) {
    if (event.type === 'payment_intent.succeeded') {
      return { event: 'created', data: event.data.object };
    }
    return null; // ignore other event types
  }
});

Use the handler in a SvelteKit endpoint:

// src/routes/api/stripe/+server.js
import { stripeEvents } from '$live/integrations';

export async function POST({ request, platform }) {
  const body = await request.text();
  const headers = Object.fromEntries(request.headers);
  const result = await stripeEvents.handle({ body, headers, platform });
  return new Response(result.body, { status: result.status });
}

The flow is: HTTP request → verify() (authenticate the webhook) → transform() (map to a stream event) → ctx.publish() under the hood. If transform returns null, the event is silently ignored.

Signals

Point-to-point ephemeral messaging. Send a signal to a specific user without broadcasting to a topic. Useful for WebRTC signaling, notifications, direct nudges, and any case where only one user should receive the message.

Sending signals (server)

Use ctx.signal(userId, event, data) from any live() function:

// Server: send a signal
const handler = live(async (ctx, targetUserId, offer) => {
  ctx.signal(targetUserId, 'call:offer', offer);
});

Receiving signals (client)

Use onSignal from svelte-realtime/client to listen for incoming signals:

// Client: receive signals
import { onSignal } from 'svelte-realtime/client';

const unsub = onSignal(currentUser.id, (event, data) => {
  if (event === 'call:offer') showIncomingCall(data);
});

Enabling signal delivery

Enable signal delivery in your open hook:

import { enableSignals } from 'svelte-realtime/server';
export function open(ws) { enableSignals(ws); }

Signals are ephemeral - they are not stored or replayed. If the target user is offline, the signal is lost.


Under the hood, ctx.publish and ctx.platform delegate to the adapter’s platform API. See svelte-adapter-uws for the full platform reference.

Was this page helpful?