Streams - live.stream()

A stream gives the client a Svelte store with initial data, then pushes live updates whenever someone publishes to that topic.

Basic usage

// src/live/chat.js
import { live } from 'svelte-realtime/server';

export const messages = live.stream('messages', async (ctx) => {
  return db.messages.latest(50);
}, { merge: 'crud', key: 'id' });
<script>
  import { messages } from '$live/chat';
</script>

{#each $messages as msg (msg.id)}
  <p>{msg.text}</p>
{/each}

Store lifecycle

A stream store value is always your data type or undefined while loading. It is never replaced by an error object - errors and connection status live on separate reactive stores so a network failure can never crash your UI:

PropertyTypeDescription
$storeT \| undefinedYour data. On failure, the last loaded value is preserved
store.errorReadable<RpcError \| null>Current error, or null when healthy
store.statusReadable<'loading' \| 'connected' \| 'reconnecting' \| 'error'>Connection status
{#if $messages === undefined}
  <p>Loading...</p>
{:else}
  {#each $messages as msg (msg.id)}
    <p>{msg.text}</p>
  {/each}
{/if}

For full error and reconnection UI, see Errors - Stream error and status stores.

Publishing updates

Use ctx.publish() from any live() function to push events to all subscribers:

export const sendMessage = live(async (ctx, text) => {
  const msg = await db.messages.insert({ text, userId: ctx.user.id });
  ctx.publish('messages', 'created', msg);
  return msg;
});

Dynamic topics

Use a function as the first argument for per-entity streams:

export const roomMessages = live.stream(
  (ctx, roomId) => 'chat:' + roomId,
  async (ctx, roomId) => db.messages.forRoom(roomId),
  { merge: 'crud', key: 'id' }
);
<script>
  import { roomMessages } from '$live/rooms';
  let { data } = $props();
  const messages = roomMessages(data.roomId);
</script>

Same arguments return the same cached store instance.

Topic registry: defineTopics

Centralize topic strings so the SQL trigger, the stream definition, and any out-of-tree producer all reference one source of truth. defineTopics(map) validates the map at boot and exposes __patterns for tooling.

// src/lib/topics.js
import { defineTopics } from 'svelte-realtime/server';

export const TOPICS = defineTopics({
  audit: (orgId) => `audit:${orgId}`,
  security: (orgId) => `security:${orgId}`,
  systemNotices: 'system:notices'
});

// Stream definitions reference the registry:
live.stream((ctx, orgId) => TOPICS.audit(orgId), loadAudit, { merge: 'crud' });

// Tooling reads __patterns to derive shapes:
TOPICS.__patterns;
// => { audit: 'audit:{arg0}', security: 'security:{arg0}', systemNotices: 'system:notices' }

Map values can be static strings or (...args) => string functions. The helper validates non-empty strings and rejects reserved names (__patterns, __definedTopics). Pattern derivation calls each function with sentinel placeholders ({arg0}, {arg1}, …) and falls back to '<dynamic>' if the function throws on placeholders or returns a non-string.

Build-time registry check

When the Vite plugin sees a defineTopics({...}) call anywhere under src/, it builds a registry of patterns and validates string-literal topics passed to live.stream(...) and live.channel(...) against it. A literal that does not match any registered pattern triggers a one-shot warning per (file, topic) pair:

[svelte-realtime] src/live/feed.js: live.stream topic 'mistyped-topic' is not in
your TOPICS registry. Either add it to defineTopics({...}) or call TOPICS.<name>(...)
instead of passing a string literal.

The check covers two value shapes per topic key:

  • static string literals (e.g. feed: 'feed:notices')
  • arrow functions that return a template literal:
audit: (orgId) => `audit:${orgId}`

For arrow-returns, template interpolations match .+ - so 'audit:org-123' and 'audit:any-id' both pass against the audit pattern. Function references and other dynamic value shapes are silently skipped at parse time - the warning only fires for literal topics under a confidently parsed registry. If your project does not call defineTopics at all, the check is disabled.

Stream options

OptionDefaultDescription
merge'crud'How events are applied. See Merge Strategies
key'id'Key field for crud mode
prependfalsePrepend new items instead of appending
max50 / 0Max items to keep. Defaults to 50 for latest, 0 (unlimited) for crud. Oldest items are dropped when exceeded
replayfalseEnable seq-based replay for gap-free reconnection
args-Standard Schema (Zod / Valibot / custom) applied to dynamic-topic factory args before the loader runs. Rejects with VALIDATION on bad input
transform-(ctx, item) => item projection applied to BOTH initial-load data and every live publish. Use for select-trimming, computed fields
coalesceBy-(event, data) => key per-subscriber coalesce key. Same-key publishes within the same flush tick collapse to the latest. Cannot combine with volatile
volatilefalseSkip per-topic seq stamping. Use for ephemeral channels (cursor moves, typing indicators) where replay / gap-fill is meaningless. Cannot combine with coalesceBy
staleAfterMs-If no publish lands within this window, fire the onStale hook (or onError) so the stream can refresh its initial state. Watchdog primitive for “stream went quiet” recovery
invalidateOn-string \| string[] of topics; on ctx.publish(invalidatedTopic, 'invalidate', ...), the stream re-runs its init loader and re-broadcasts the result. The companion ctx.invalidate(topic) is the publish-side trigger
onError-(err, ctx) => void per-stream error hook. Receives errors from init loader, transform, filter, access, and lifecycle hooks. Defaults to the global onError registry
classOfService-Named load-shedding class registered via live.admission(). New subscribes shed under matching pressure. Existing subscribers unaffected. See API -> live.admission
onSubscribe-(ctx, topic) => void fires when a client subscribes
onUnsubscribe-(ctx, topic, remainingSubscribers) => void fires when a client disconnects. The third arg is the count of remaining subscribers on the topic AFTER this client leaves. Use === 0 to tear down upstream feeds, close database listeners, etc.
filter / access-Per-connection publish filter (see Access control)
delta-Delta sync config (see Delta sync)
version-Schema version (see Schema evolution)
migrate-Migration functions (see Schema evolution)

Lifecycle hooks

export const presence = live.stream('room:lobby', async (ctx) => {
  return db.presence.list('lobby');
}, {
  merge: 'presence',
  onSubscribe(ctx, topic) {
    ctx.publish(topic, 'join', { key: ctx.user.id, name: ctx.user.name });
  },
  onUnsubscribe(ctx, topic, remainingSubscribers) {
    ctx.publish(topic, 'leave', { key: ctx.user.id });
    if (remainingSubscribers === 0) {
      // last client out: close the upstream Kafka consumer for this room
      upstream.unsubscribe(`kafka:room:lobby`);
    }
  }
});

The remainingSubscribers count is computed AFTER this client’s leave is applied. 0 means this was the last subscriber; the topic is effectively idle and any upstream feed wired during onSubscribe should be torn down here.

Staleness watchdog + topic-driven invalidation

export const orgFeed = live.stream(
  (ctx) => `org:${ctx.user.orgId}:feed`,
  async (ctx) => db.feed.forOrg(ctx.user.orgId),
  {
    merge: 'crud',
    staleAfterMs: 30_000,
    invalidateOn: ['org:settings:updated'],
    onError(err, ctx) {
      logger.warn({ err, topic: ctx.topic }, 'stream error');
    }
  }
);

// From any handler that mutates the org's settings:
export const updateSettings = live(async (ctx, patch) => {
  await db.orgs.update(ctx.user.orgId, patch);
  ctx.invalidate(`org:${ctx.user.orgId}:feed`);  // triggers init re-run on every subscriber
});

staleAfterMs is a watchdog: if no publish lands on the topic within the window, the stream re-runs its init loader and re-broadcasts. Useful when the publish path is fragile (external feed, derived computation) and “stream went quiet” should be self-healing. invalidateOn is the same shape but triggered by a known publish event on another topic - mutations on the trigger topic invalidate the listening stream.

Reconnection

When the WebSocket reconnects, streams automatically refetch initial data and resubscribe. The store keeps showing stale data during the refetch - it does not reset to undefined.

Pagination

See Client APIs - Pagination.

SSR hydration

See Client APIs - SSR Hydration.


Channels

Ephemeral pub/sub topics with no database initialization. Clients subscribe and receive live events immediately - no init function, no database query.

// src/live/typing.js
import { live } from 'svelte-realtime/server';

export const typing = live.channel('typing:lobby', { merge: 'presence' });
<script>
  import { typing } from '$live/typing';
</script>

{#each $typing as user (user.key)}
  <span>{user.data.name} is typing...</span>
{/each}

Dynamic channels work the same way:

export const cursors = live.channel(
  (ctx, docId) => 'cursors:' + docId,
  { merge: 'cursor' }
);

live.channel() is essentially live.stream() without an init function. The store starts as an empty value matching the merge strategy (empty array for presence/cursor/crud, null for set) and only receives data through published events.


Derived streams

Server-side computed streams that recompute when any source topic publishes. List the source topics → provide a computation function → clients subscribe like a normal stream.

import { live } from 'svelte-realtime/server';

export const dashboardStats = live.derived(
  ['orders', 'inventory', 'users'],
  async () => {
    return {
      totalOrders: await db.orders.count(),
      lowStock: await db.inventory.lowStockCount(),
      activeUsers: await db.users.activeCount()
    };
  },
  { debounce: 500 }
);

On the client, derived streams work like regular streams:

<script>
  import { dashboardStats } from '$live/dashboard';
</script>

<p>Orders: {$dashboardStats?.totalOrders}</p>

Dynamic derived streams

When source topics depend on runtime arguments (e.g. an org ID, a room ID), pass a source factory function instead of a static array. The factory receives the same args the client passes at subscribe time:

export const orgStats = live.derived(
  (orgId) => [`memberships:${orgId}`, `emails:${orgId}`, `audit:${orgId}`],
  async (ctx, orgId) => {
    const [members, emails, auditCount] = await Promise.all([
      db.query('SELECT count(*) FROM memberships WHERE org_id = $1', [orgId]),
      db.query('SELECT count(*) FROM emails WHERE org_id = $1', [orgId]),
      db.query('SELECT count(*) FROM audit_log WHERE org_id = $1', [orgId])
    ]);
    return { members, emails, auditCount };
  },
  { debounce: 100 }
);

On the client, dynamic derived streams are called like functions:

<script>
  import { orgStats } from '$live/dashboard';
  let { orgId } = $props();
</script>

<p>Members: {$orgStats(orgId)?.members}</p>

Each unique set of args creates an independent instance with its own source subscriptions. Instances are created when the first subscriber connects and cleaned up when the last subscriber disconnects.

The dynamic compute function receives ctx.user from the subscribing client, so guards like if (orgId !== ctx.user.organization_id) throw new LiveError('FORBIDDEN') work the same as in regular stream handlers.

Activation

Call _activateDerived(platform) in your open hook to enable derived stream listeners:

import { _activateDerived } from 'svelte-realtime/server';

export function open(ws, { platform }) {
  _activateDerived(platform);
}

Without this call, derived streams still serve their initial SSR data but never receive live updates. In dev mode, a console warning is emitted when a client subscribes to a derived stream and _activateDerived has not been called.

OptionDefaultDescription
merge'set'Merge strategy for the derived topic
debounce0Debounce recomputation by this many milliseconds

Effects

Server-side reactive side effects that fire when source topics publish. Fire-and-forget - no topic, no client subscription.

// src/live/notifications.js
import { live } from 'svelte-realtime/server';

export const orderNotifications = live.effect(['orders'], async (event, data, platform) => {
  if (event === 'created') {
    await email.send(data.userEmail, 'Order confirmed', templates.orderConfirm(data));
  }
});

Effects are server-only. They fire whenever a matching topic publishes and cannot be subscribed to from the client. Use them for sending emails, logging analytics, syncing to external services, or any side effect that should happen in response to a topic event.


Aggregates

Real-time incremental aggregations. Reducers run on each event, maintaining O(1) state. The aggregate publishes its state to the output topic on every event → clients subscribe to the output topic as a regular stream.

// src/live/stats.js
import { live } from 'svelte-realtime/server';

export const orderStats = live.aggregate('orders', {
  count: { init: () => 0, reduce: (acc, event) => event === 'created' ? acc + 1 : acc },
  total: { init: () => 0, reduce: (acc, event, data) => event === 'created' ? acc + data.amount : acc },
  avg: { compute: (state) => state.count > 0 ? state.total / state.count : 0 }
}, { topic: 'order-stats' });

Each field in the aggregate config can have:

  • init - returns the initial value for the accumulator
  • reduce - called on every event with (acc, event, data) → returns the new accumulator value
  • compute - a derived field computed from the current state of all other fields (not called with events directly)

For time-windowed aggregations (lifetime, tumbling, sliding) plus combineSum / combineMax / combineMin / combineCounts / combineMerge reducers, snapshot hydration, and the full options reference, see Aggregates.


Gates

Conditional stream activation. On the server, a predicate controls whether the client subscribes. On the client, .when() manages the subscription lifecycle.

// src/live/beta.js
import { live } from 'svelte-realtime/server';

export const betaFeed = live.gate(
  (ctx) => ctx.user?.flags?.includes('beta'),
  live.stream('beta-feed', async (ctx) => db.betaFeed.latest(50), { merge: 'latest' })
);
<script>
  import { betaFeed } from '$live/beta';

  import { writable } from 'svelte/store';

  const tabActive = writable(true);
  const feed = betaFeed.when(tabActive);
</script>

{#if $feed !== undefined}
  {#each $feed as item (item.id)}
    <p>{item.title}</p>
  {/each}
{/if}

When the predicate returns false, the server responds with a graceful no-op (no error, no subscription). The client store stays undefined. .when() accepts a boolean, a Svelte store, or a getter function. When given a store, it subscribes/unsubscribes reactively as the value changes. Getter functions are evaluated once at subscribe time; for reactivity with Svelte 5 $state, wrap in $derived or pass a store.


Pipes

Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.

// src/live/notifications.js
import { live, pipe } from 'svelte-realtime/server';

export const myNotifications = pipe(
  live.stream('notifications', async (ctx) => {
    return db.notifications.forUser(ctx.user.id);
  }, { merge: 'crud', key: 'id' }),

  pipe.filter((ctx, item) => !item.dismissed),
  pipe.sort('createdAt', 'desc'),
  pipe.limit(20),
  pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
);
TransformInitial dataLive events
pipe.filter(predicate)Filters the arrayInitial data only
pipe.sort(field, dir)Sorts the arrayInitial data only
pipe.limit(n)Slices to N itemsInitial data only
pipe.join(field, resolver, as)Enriches each itemInitial data only

Piped functions preserve all stream metadata. The client receives already-transformed data. Import pipe from 'svelte-realtime/server' alongside live.


Under the hood, streams use the adapter’s pub/sub system. See svelte-adapter-uws plugins for replay, presence, and throttle plugins.

Was this page helpful?