Streams - live.stream()

A stream gives the client a Svelte store with initial data, then pushes live updates whenever someone publishes to that topic.

Basic usage

// src/live/chat.js
import { live } from 'svelte-realtime/server';

export const messages = live.stream('messages', async (ctx) => {
  return db.messages.latest(50);
}, { merge: 'crud', key: 'id' });
<script>
  import { messages } from '$live/chat';
</script>

{#each $messages as msg (msg.id)}
  <p>{msg.text}</p>
{/each}

Store lifecycle

A stream store has three states:

ValueMeaning
undefinedLoading - initial fetch in progress
Data (array, object, etc.)Loaded and receiving live updates
{ error: RpcError }Initial fetch failed
{#if $messages === undefined}
  <p>Loading...</p>
{:else if $messages?.error}
  <p>Error: {$messages.error.message}</p>
{:else}
  {#each $messages as msg (msg.id)}
    <p>{msg.text}</p>
  {/each}
{/if}

Publishing updates

Use ctx.publish() from any live() function to push events to all subscribers:

export const sendMessage = live(async (ctx, text) => {
  const msg = await db.messages.insert({ text, userId: ctx.user.id });
  ctx.publish('messages', 'created', msg);
  return msg;
});

Dynamic topics

Use a function as the first argument for per-entity streams:

export const roomMessages = live.stream(
  (ctx, roomId) => 'chat:' + roomId,
  async (ctx, roomId) => db.messages.forRoom(roomId),
  { merge: 'crud', key: 'id' }
);
<script>
  import { roomMessages } from '$live/rooms';
  let { data } = $props();
  const messages = roomMessages(data.roomId);
</script>

Same arguments return the same cached store instance.

Stream options

OptionDefaultDescription
merge'crud'How events are applied. See Merge Strategies
key'id'Key field for crud mode
prependfalsePrepend new items instead of appending
max50Max items to keep (latest mode)
replayfalseEnable seq-based replay for gap-free reconnection
onSubscribe-Callback when a client subscribes
onUnsubscribe-Callback when a client disconnects
filter / access-Per-connection publish filter

Lifecycle hooks

export const presence = live.stream('room:lobby', async (ctx) => {
  return db.presence.list('lobby');
}, {
  merge: 'presence',
  onSubscribe(ctx, topic) {
    ctx.publish(topic, 'join', { key: ctx.user.id, name: ctx.user.name });
  },
  onUnsubscribe(ctx, topic) {
    ctx.publish(topic, 'leave', { key: ctx.user.id });
  }
});

Reconnection

When the WebSocket reconnects, streams automatically refetch initial data and resubscribe. The store keeps showing stale data during the refetch - it does not reset to undefined.

Pagination

See Client APIs - Pagination.

SSR hydration

See Client APIs - SSR Hydration.


Channels

Ephemeral pub/sub topics with no database initialization. Clients subscribe and receive live events immediately - no init function, no database query.

// src/live/typing.js
import { live } from 'svelte-realtime/server';

export const typing = live.channel('typing:lobby', { merge: 'presence' });
<script>
  import { typing } from '$live/typing';
</script>

{#each $typing as user (user.key)}
  <span>{user.data.name} is typing...</span>
{/each}

Dynamic channels work the same way:

export const cursors = live.channel(
  (ctx, docId) => 'cursors:' + docId,
  { merge: 'cursor' }
);

live.channel() is essentially live.stream() without an init function. The store starts as an empty value matching the merge strategy (empty array for presence/cursor/crud, null for set) and only receives data through published events.


Derived streams

Server-side computed streams that recompute when any source topic publishes. List the source topics → provide a computation function → clients subscribe like a normal stream.

import { live } from 'svelte-realtime/server';

export const dashboardStats = live.derived(
  ['orders', 'inventory', 'users'],
  async () => {
    return {
      totalOrders: await db.orders.count(),
      lowStock: await db.inventory.lowStockCount(),
      activeUsers: await db.users.activeCount()
    };
  },
  { debounce: 500 }
);

On the client, derived streams work like regular streams:

<script>
  import { dashboardStats } from '$live/dashboard';
</script>

<p>Orders: {$dashboardStats?.totalOrders}</p>

Call _activateDerived(platform) in your open hook to enable derived stream listeners:

import { _activateDerived } from 'svelte-realtime/server';

export function open(ws, { platform }) {
  _activateDerived(platform);
}
OptionDefaultDescription
merge'set'Merge strategy for the derived topic
debounce0Debounce recomputation by this many milliseconds

Effects

Server-side reactive side effects that fire when source topics publish. Fire-and-forget - no topic, no client subscription.

// src/live/notifications.js
import { live } from 'svelte-realtime/server';

export const orderNotifications = live.effect(['orders'], async (event, data, platform) => {
  if (event === 'created') {
    await email.send(data.userEmail, 'Order confirmed', templates.orderConfirm(data));
  }
});

Effects are server-only. They fire whenever a matching topic publishes and cannot be subscribed to from the client. Use them for sending emails, logging analytics, syncing to external services, or any side effect that should happen in response to a topic event.


Aggregates

Real-time incremental aggregations. Reducers run on each event, maintaining O(1) state. The aggregate publishes its state to the output topic on every event → clients subscribe to the output topic as a regular stream.

// src/live/stats.js
import { live } from 'svelte-realtime/server';

export const orderStats = live.aggregate('orders', {
  count: { init: () => 0, reduce: (acc, event) => event === 'created' ? acc + 1 : acc },
  total: { init: () => 0, reduce: (acc, event, data) => event === 'created' ? acc + data.amount : acc },
  avg: { compute: (state) => state.count > 0 ? state.total / state.count : 0 }
}, { topic: 'order-stats' });

Each field in the aggregate config can have:

  • init - returns the initial value for the accumulator
  • reduce - called on every event with (acc, event, data) → returns the new accumulator value
  • compute - a derived field computed from the current state of all other fields (not called with events directly)

Gates

Conditional stream activation. On the server, a predicate controls whether the client subscribes. On the client, .when() manages the subscription lifecycle.

// src/live/beta.js
import { live } from 'svelte-realtime/server';

export const betaFeed = live.gate(
  (ctx) => ctx.user?.flags?.includes('beta'),
  live.stream('beta-feed', async (ctx) => db.betaFeed.latest(50), { merge: 'latest' })
);
<script>
  import { betaFeed } from '$live/beta';

  import { writable } from 'svelte/store';

  const tabActive = writable(true);
  const feed = betaFeed.when(tabActive);
</script>

{#if $feed !== undefined}
  {#each $feed as item (item.id)}
    <p>{item.title}</p>
  {/each}
{/if}

When the predicate returns false, the server responds with a graceful no-op (no error, no subscription). The client store stays undefined. .when() accepts a boolean, a Svelte store, or a getter function. When given a store, it subscribes/unsubscribes reactively as the value changes. Getter functions are evaluated once at subscribe time; for reactivity with Svelte 5 $state, wrap in $derived or pass a store.


Pipes

Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.

// src/live/notifications.js
import { live, pipe } from 'svelte-realtime/server';

export const myNotifications = pipe(
  live.stream('notifications', async (ctx) => {
    return db.notifications.forUser(ctx.user.id);
  }, { merge: 'crud', key: 'id' }),

  pipe.filter((ctx, item) => !item.dismissed),
  pipe.sort('createdAt', 'desc'),
  pipe.limit(20),
  pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
);
TransformInitial dataLive events
pipe.filter(predicate)Filters the arrayInitial data only
pipe.sort(field, dir)Sorts the arrayInitial data only
pipe.limit(n)Slices to N itemsInitial data only
pipe.join(field, resolver, as)Enriches each itemInitial data only

Piped functions preserve all stream metadata. The client receives already-transformed data. Import pipe from 'svelte-realtime/server' alongside live.


Under the hood, streams use the adapter’s pub/sub system. See svelte-adapter-uws plugins for replay, presence, and throttle plugins.

Was this page helpful?