Streams - live.stream()
A stream gives the client a Svelte store with initial data, then pushes live updates whenever someone publishes to that topic.
Basic usage
// src/live/chat.js
import { live } from 'svelte-realtime/server';
export const messages = live.stream('messages', async (ctx) => {
return db.messages.latest(50);
}, { merge: 'crud', key: 'id' }); <script>
import { messages } from '$live/chat';
</script>
{#each $messages as msg (msg.id)}
<p>{msg.text}</p>
{/each} Store lifecycle
A stream store has three states:
| Value | Meaning |
|---|---|
undefined | Loading - initial fetch in progress |
| Data (array, object, etc.) | Loaded and receiving live updates |
{ error: RpcError } | Initial fetch failed |
{#if $messages === undefined}
<p>Loading...</p>
{:else if $messages?.error}
<p>Error: {$messages.error.message}</p>
{:else}
{#each $messages as msg (msg.id)}
<p>{msg.text}</p>
{/each}
{/if} Publishing updates
Use ctx.publish() from any live() function to push events to all subscribers:
export const sendMessage = live(async (ctx, text) => {
const msg = await db.messages.insert({ text, userId: ctx.user.id });
ctx.publish('messages', 'created', msg);
return msg;
}); Dynamic topics
Use a function as the first argument for per-entity streams:
export const roomMessages = live.stream(
(ctx, roomId) => 'chat:' + roomId,
async (ctx, roomId) => db.messages.forRoom(roomId),
{ merge: 'crud', key: 'id' }
); <script>
import { roomMessages } from '$live/rooms';
let { data } = $props();
const messages = roomMessages(data.roomId);
</script> Same arguments return the same cached store instance.
Stream options
| Option | Default | Description |
|---|---|---|
merge | 'crud' | How events are applied. See Merge Strategies |
key | 'id' | Key field for crud mode |
prepend | false | Prepend new items instead of appending |
max | 50 | Max items to keep (latest mode) |
replay | false | Enable seq-based replay for gap-free reconnection |
onSubscribe | - | Callback when a client subscribes |
onUnsubscribe | - | Callback when a client disconnects |
filter / access | - | Per-connection publish filter |
Lifecycle hooks
export const presence = live.stream('room:lobby', async (ctx) => {
return db.presence.list('lobby');
}, {
merge: 'presence',
onSubscribe(ctx, topic) {
ctx.publish(topic, 'join', { key: ctx.user.id, name: ctx.user.name });
},
onUnsubscribe(ctx, topic) {
ctx.publish(topic, 'leave', { key: ctx.user.id });
}
}); Reconnection
When the WebSocket reconnects, streams automatically refetch initial data and resubscribe. The store keeps showing stale data during the refetch - it does not reset to undefined.
Pagination
SSR hydration
See Client APIs - SSR Hydration.
Channels
Ephemeral pub/sub topics with no database initialization. Clients subscribe and receive live events immediately - no init function, no database query.
// src/live/typing.js
import { live } from 'svelte-realtime/server';
export const typing = live.channel('typing:lobby', { merge: 'presence' }); <script>
import { typing } from '$live/typing';
</script>
{#each $typing as user (user.key)}
<span>{user.data.name} is typing...</span>
{/each} Dynamic channels work the same way:
export const cursors = live.channel(
(ctx, docId) => 'cursors:' + docId,
{ merge: 'cursor' }
); live.channel() is essentially live.stream() without an init function. The store starts as an empty value matching the merge strategy (empty array for presence/cursor/crud, null for set) and only receives data through published events.
Derived streams
Server-side computed streams that recompute when any source topic publishes. List the source topics → provide a computation function → clients subscribe like a normal stream.
import { live } from 'svelte-realtime/server';
export const dashboardStats = live.derived(
['orders', 'inventory', 'users'],
async () => {
return {
totalOrders: await db.orders.count(),
lowStock: await db.inventory.lowStockCount(),
activeUsers: await db.users.activeCount()
};
},
{ debounce: 500 }
); On the client, derived streams work like regular streams:
<script>
import { dashboardStats } from '$live/dashboard';
</script>
<p>Orders: {$dashboardStats?.totalOrders}</p> Call _activateDerived(platform) in your open hook to enable derived stream listeners:
import { _activateDerived } from 'svelte-realtime/server';
export function open(ws, { platform }) {
_activateDerived(platform);
} | Option | Default | Description |
|---|---|---|
merge | 'set' | Merge strategy for the derived topic |
debounce | 0 | Debounce recomputation by this many milliseconds |
Effects
Server-side reactive side effects that fire when source topics publish. Fire-and-forget - no topic, no client subscription.
// src/live/notifications.js
import { live } from 'svelte-realtime/server';
export const orderNotifications = live.effect(['orders'], async (event, data, platform) => {
if (event === 'created') {
await email.send(data.userEmail, 'Order confirmed', templates.orderConfirm(data));
}
}); Effects are server-only. They fire whenever a matching topic publishes and cannot be subscribed to from the client. Use them for sending emails, logging analytics, syncing to external services, or any side effect that should happen in response to a topic event.
Aggregates
Real-time incremental aggregations. Reducers run on each event, maintaining O(1) state. The aggregate publishes its state to the output topic on every event → clients subscribe to the output topic as a regular stream.
// src/live/stats.js
import { live } from 'svelte-realtime/server';
export const orderStats = live.aggregate('orders', {
count: { init: () => 0, reduce: (acc, event) => event === 'created' ? acc + 1 : acc },
total: { init: () => 0, reduce: (acc, event, data) => event === 'created' ? acc + data.amount : acc },
avg: { compute: (state) => state.count > 0 ? state.total / state.count : 0 }
}, { topic: 'order-stats' }); Each field in the aggregate config can have:
init- returns the initial value for the accumulatorreduce- called on every event with(acc, event, data)→ returns the new accumulator valuecompute- a derived field computed from the current state of all other fields (not called with events directly)
Gates
Conditional stream activation. On the server, a predicate controls whether the client subscribes. On the client, .when() manages the subscription lifecycle.
// src/live/beta.js
import { live } from 'svelte-realtime/server';
export const betaFeed = live.gate(
(ctx) => ctx.user?.flags?.includes('beta'),
live.stream('beta-feed', async (ctx) => db.betaFeed.latest(50), { merge: 'latest' })
); <script>
import { betaFeed } from '$live/beta';
import { writable } from 'svelte/store';
const tabActive = writable(true);
const feed = betaFeed.when(tabActive);
</script>
{#if $feed !== undefined}
{#each $feed as item (item.id)}
<p>{item.title}</p>
{/each}
{/if} When the predicate returns false, the server responds with a graceful no-op (no error, no subscription). The client store stays undefined. .when() accepts a boolean, a Svelte store, or a getter function. When given a store, it subscribes/unsubscribes reactively as the value changes. Getter functions are evaluated once at subscribe time; for reactivity with Svelte 5 $state, wrap in $derived or pass a store.
Pipes
Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.
// src/live/notifications.js
import { live, pipe } from 'svelte-realtime/server';
export const myNotifications = pipe(
live.stream('notifications', async (ctx) => {
return db.notifications.forUser(ctx.user.id);
}, { merge: 'crud', key: 'id' }),
pipe.filter((ctx, item) => !item.dismissed),
pipe.sort('createdAt', 'desc'),
pipe.limit(20),
pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
); | Transform | Initial data | Live events |
|---|---|---|
pipe.filter(predicate) | Filters the array | Initial data only |
pipe.sort(field, dir) | Sorts the array | Initial data only |
pipe.limit(n) | Slices to N items | Initial data only |
pipe.join(field, resolver, as) | Enriches each item | Initial data only |
Piped functions preserve all stream metadata. The client receives already-transformed data. Import pipe from 'svelte-realtime/server' alongside live.
Under the hood, streams use the adapter’s pub/sub system. See svelte-adapter-uws plugins for replay, presence, and throttle plugins.
Was this page helpful?