Streams - live.stream()
A stream gives the client a Svelte store with initial data, then pushes live updates whenever someone publishes to that topic.
Basic usage
// src/live/chat.js
import { live } from 'svelte-realtime/server';
export const messages = live.stream('messages', async (ctx) => {
return db.messages.latest(50);
}, { merge: 'crud', key: 'id' }); <script>
import { messages } from '$live/chat';
</script>
{#each $messages as msg (msg.id)}
<p>{msg.text}</p>
{/each} Store lifecycle
A stream store value is always your data type or undefined while loading. It is never replaced by an error object - errors and connection status live on separate reactive stores so a network failure can never crash your UI:
| Property | Type | Description |
|---|---|---|
$store | T \| undefined | Your data. On failure, the last loaded value is preserved |
store.error | Readable<RpcError \| null> | Current error, or null when healthy |
store.status | Readable<'loading' \| 'connected' \| 'reconnecting' \| 'error'> | Connection status |
{#if $messages === undefined}
<p>Loading...</p>
{:else}
{#each $messages as msg (msg.id)}
<p>{msg.text}</p>
{/each}
{/if} For full error and reconnection UI, see Errors - Stream error and status stores.
Publishing updates
Use ctx.publish() from any live() function to push events to all subscribers:
export const sendMessage = live(async (ctx, text) => {
const msg = await db.messages.insert({ text, userId: ctx.user.id });
ctx.publish('messages', 'created', msg);
return msg;
}); Dynamic topics
Use a function as the first argument for per-entity streams:
export const roomMessages = live.stream(
(ctx, roomId) => 'chat:' + roomId,
async (ctx, roomId) => db.messages.forRoom(roomId),
{ merge: 'crud', key: 'id' }
); <script>
import { roomMessages } from '$live/rooms';
let { data } = $props();
const messages = roomMessages(data.roomId);
</script> Same arguments return the same cached store instance.
Stream options
| Option | Default | Description |
|---|---|---|
merge | 'crud' | How events are applied. See Merge Strategies |
key | 'id' | Key field for crud mode |
prepend | false | Prepend new items instead of appending |
max | 50 / 0 | Max items to keep. Defaults to 50 for latest, 0 (unlimited) for crud. Oldest items are dropped when exceeded |
replay | false | Enable seq-based replay for gap-free reconnection |
onSubscribe | - | Callback when a client subscribes |
onUnsubscribe | - | Callback when a client disconnects |
filter / access | - | Per-connection publish filter (see Access control) |
delta | - | Delta sync config (see Delta sync) |
version | - | Schema version (see Schema evolution) |
migrate | - | Migration functions (see Schema evolution) |
Lifecycle hooks
export const presence = live.stream('room:lobby', async (ctx) => {
return db.presence.list('lobby');
}, {
merge: 'presence',
onSubscribe(ctx, topic) {
ctx.publish(topic, 'join', { key: ctx.user.id, name: ctx.user.name });
},
onUnsubscribe(ctx, topic) {
ctx.publish(topic, 'leave', { key: ctx.user.id });
}
}); Reconnection
When the WebSocket reconnects, streams automatically refetch initial data and resubscribe. The store keeps showing stale data during the refetch - it does not reset to undefined.
Pagination
SSR hydration
See Client APIs - SSR Hydration.
Channels
Ephemeral pub/sub topics with no database initialization. Clients subscribe and receive live events immediately - no init function, no database query.
// src/live/typing.js
import { live } from 'svelte-realtime/server';
export const typing = live.channel('typing:lobby', { merge: 'presence' }); <script>
import { typing } from '$live/typing';
</script>
{#each $typing as user (user.key)}
<span>{user.data.name} is typing...</span>
{/each} Dynamic channels work the same way:
export const cursors = live.channel(
(ctx, docId) => 'cursors:' + docId,
{ merge: 'cursor' }
); live.channel() is essentially live.stream() without an init function. The store starts as an empty value matching the merge strategy (empty array for presence/cursor/crud, null for set) and only receives data through published events.
Derived streams
Server-side computed streams that recompute when any source topic publishes. List the source topics → provide a computation function → clients subscribe like a normal stream.
import { live } from 'svelte-realtime/server';
export const dashboardStats = live.derived(
['orders', 'inventory', 'users'],
async () => {
return {
totalOrders: await db.orders.count(),
lowStock: await db.inventory.lowStockCount(),
activeUsers: await db.users.activeCount()
};
},
{ debounce: 500 }
); On the client, derived streams work like regular streams:
<script>
import { dashboardStats } from '$live/dashboard';
</script>
<p>Orders: {$dashboardStats?.totalOrders}</p> Dynamic derived streams
When source topics depend on runtime arguments (e.g. an org ID, a room ID), pass a source factory function instead of a static array. The factory receives the same args the client passes at subscribe time:
export const orgStats = live.derived(
(orgId) => [`memberships:${orgId}`, `emails:${orgId}`, `audit:${orgId}`],
async (ctx, orgId) => {
const [members, emails, auditCount] = await Promise.all([
db.query('SELECT count(*) FROM memberships WHERE org_id = $1', [orgId]),
db.query('SELECT count(*) FROM emails WHERE org_id = $1', [orgId]),
db.query('SELECT count(*) FROM audit_log WHERE org_id = $1', [orgId])
]);
return { members, emails, auditCount };
},
{ debounce: 100 }
); On the client, dynamic derived streams are called like functions:
<script>
import { orgStats } from '$live/dashboard';
let { orgId } = $props();
</script>
<p>Members: {$orgStats(orgId)?.members}</p> Each unique set of args creates an independent instance with its own source subscriptions. Instances are created when the first subscriber connects and cleaned up when the last subscriber disconnects.
The dynamic compute function receives ctx.user from the subscribing client, so guards like if (orgId !== ctx.user.organization_id) throw new LiveError('FORBIDDEN') work the same as in regular stream handlers.
Activation
Call _activateDerived(platform) in your open hook to enable derived stream listeners:
import { _activateDerived } from 'svelte-realtime/server';
export function open(ws, { platform }) {
_activateDerived(platform);
} Without this call, derived streams still serve their initial SSR data but never receive live updates. In dev mode, a console warning is emitted when a client subscribes to a derived stream and _activateDerived has not been called.
| Option | Default | Description |
|---|---|---|
merge | 'set' | Merge strategy for the derived topic |
debounce | 0 | Debounce recomputation by this many milliseconds |
Effects
Server-side reactive side effects that fire when source topics publish. Fire-and-forget - no topic, no client subscription.
// src/live/notifications.js
import { live } from 'svelte-realtime/server';
export const orderNotifications = live.effect(['orders'], async (event, data, platform) => {
if (event === 'created') {
await email.send(data.userEmail, 'Order confirmed', templates.orderConfirm(data));
}
}); Effects are server-only. They fire whenever a matching topic publishes and cannot be subscribed to from the client. Use them for sending emails, logging analytics, syncing to external services, or any side effect that should happen in response to a topic event.
Aggregates
Real-time incremental aggregations. Reducers run on each event, maintaining O(1) state. The aggregate publishes its state to the output topic on every event → clients subscribe to the output topic as a regular stream.
// src/live/stats.js
import { live } from 'svelte-realtime/server';
export const orderStats = live.aggregate('orders', {
count: { init: () => 0, reduce: (acc, event) => event === 'created' ? acc + 1 : acc },
total: { init: () => 0, reduce: (acc, event, data) => event === 'created' ? acc + data.amount : acc },
avg: { compute: (state) => state.count > 0 ? state.total / state.count : 0 }
}, { topic: 'order-stats' }); Each field in the aggregate config can have:
init- returns the initial value for the accumulatorreduce- called on every event with(acc, event, data)→ returns the new accumulator valuecompute- a derived field computed from the current state of all other fields (not called with events directly)
Gates
Conditional stream activation. On the server, a predicate controls whether the client subscribes. On the client, .when() manages the subscription lifecycle.
// src/live/beta.js
import { live } from 'svelte-realtime/server';
export const betaFeed = live.gate(
(ctx) => ctx.user?.flags?.includes('beta'),
live.stream('beta-feed', async (ctx) => db.betaFeed.latest(50), { merge: 'latest' })
); <script>
import { betaFeed } from '$live/beta';
import { writable } from 'svelte/store';
const tabActive = writable(true);
const feed = betaFeed.when(tabActive);
</script>
{#if $feed !== undefined}
{#each $feed as item (item.id)}
<p>{item.title}</p>
{/each}
{/if} When the predicate returns false, the server responds with a graceful no-op (no error, no subscription). The client store stays undefined. .when() accepts a boolean, a Svelte store, or a getter function. When given a store, it subscribes/unsubscribes reactively as the value changes. Getter functions are evaluated once at subscribe time; for reactivity with Svelte 5 $state, wrap in $derived or pass a store.
Pipes
Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.
// src/live/notifications.js
import { live, pipe } from 'svelte-realtime/server';
export const myNotifications = pipe(
live.stream('notifications', async (ctx) => {
return db.notifications.forUser(ctx.user.id);
}, { merge: 'crud', key: 'id' }),
pipe.filter((ctx, item) => !item.dismissed),
pipe.sort('createdAt', 'desc'),
pipe.limit(20),
pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
); | Transform | Initial data | Live events |
|---|---|---|
pipe.filter(predicate) | Filters the array | Initial data only |
pipe.sort(field, dir) | Sorts the array | Initial data only |
pipe.limit(n) | Slices to N items | Initial data only |
pipe.join(field, resolver, as) | Enriches each item | Initial data only |
Piped functions preserve all stream metadata. The client receives already-transformed data. Import pipe from 'svelte-realtime/server' alongside live.
Under the hood, streams use the adapter’s pub/sub system. See svelte-adapter-uws plugins for replay, presence, and throttle plugins.
Was this page helpful?