Streams - live.stream()
A stream gives the client a Svelte store with initial data, then pushes live updates whenever someone publishes to that topic.
Basic usage
// src/live/chat.js
import { live } from 'svelte-realtime/server';
export const messages = live.stream('messages', async (ctx) => {
return db.messages.latest(50);
}, { merge: 'crud', key: 'id' }); <script>
import { messages } from '$live/chat';
</script>
{#each $messages as msg (msg.id)}
<p>{msg.text}</p>
{/each} Store lifecycle
A stream store value is always your data type or undefined while loading. It is never replaced by an error object - errors and connection status live on separate reactive stores so a network failure can never crash your UI:
| Property | Type | Description |
|---|---|---|
$store | T \| undefined | Your data. On failure, the last loaded value is preserved |
store.error | Readable<RpcError \| null> | Current error, or null when healthy |
store.status | Readable<'loading' \| 'connected' \| 'reconnecting' \| 'error'> | Connection status |
{#if $messages === undefined}
<p>Loading...</p>
{:else}
{#each $messages as msg (msg.id)}
<p>{msg.text}</p>
{/each}
{/if} For full error and reconnection UI, see Errors - Stream error and status stores.
Publishing updates
Use ctx.publish() from any live() function to push events to all subscribers:
export const sendMessage = live(async (ctx, text) => {
const msg = await db.messages.insert({ text, userId: ctx.user.id });
ctx.publish('messages', 'created', msg);
return msg;
}); Dynamic topics
Use a function as the first argument for per-entity streams:
export const roomMessages = live.stream(
(ctx, roomId) => 'chat:' + roomId,
async (ctx, roomId) => db.messages.forRoom(roomId),
{ merge: 'crud', key: 'id' }
); <script>
import { roomMessages } from '$live/rooms';
let { data } = $props();
const messages = roomMessages(data.roomId);
</script> Same arguments return the same cached store instance.
Topic registry: defineTopics
Centralize topic strings so the SQL trigger, the stream definition, and any out-of-tree producer all reference one source of truth. defineTopics(map) validates the map at boot and exposes __patterns for tooling.
// src/lib/topics.js
import { defineTopics } from 'svelte-realtime/server';
export const TOPICS = defineTopics({
audit: (orgId) => `audit:${orgId}`,
security: (orgId) => `security:${orgId}`,
systemNotices: 'system:notices'
});
// Stream definitions reference the registry:
live.stream((ctx, orgId) => TOPICS.audit(orgId), loadAudit, { merge: 'crud' });
// Tooling reads __patterns to derive shapes:
TOPICS.__patterns;
// => { audit: 'audit:{arg0}', security: 'security:{arg0}', systemNotices: 'system:notices' } Map values can be static strings or (...args) => string functions. The helper validates non-empty strings and rejects reserved names (__patterns, __definedTopics). Pattern derivation calls each function with sentinel placeholders ({arg0}, {arg1}, …) and falls back to '<dynamic>' if the function throws on placeholders or returns a non-string.
Build-time registry check
When the Vite plugin sees a defineTopics({...}) call anywhere under src/, it builds a registry of patterns and validates string-literal topics passed to live.stream(...) and live.channel(...) against it. A literal that does not match any registered pattern triggers a one-shot warning per (file, topic) pair:
[svelte-realtime] src/live/feed.js: live.stream topic 'mistyped-topic' is not in
your TOPICS registry. Either add it to defineTopics({...}) or call TOPICS.<name>(...)
instead of passing a string literal. The check covers two value shapes per topic key:
- static string literals (e.g.
feed: 'feed:notices') - arrow functions that return a template literal:
audit: (orgId) => `audit:${orgId}` For arrow-returns, template interpolations match .+ - so 'audit:org-123' and 'audit:any-id' both pass against the audit pattern. Function references and other dynamic value shapes are silently skipped at parse time - the warning only fires for literal topics under a confidently parsed registry. If your project does not call defineTopics at all, the check is disabled.
Stream options
| Option | Default | Description |
|---|---|---|
merge | 'crud' | How events are applied. See Merge Strategies |
key | 'id' | Key field for crud mode |
prepend | false | Prepend new items instead of appending |
max | 50 / 0 | Max items to keep. Defaults to 50 for latest, 0 (unlimited) for crud. Oldest items are dropped when exceeded |
replay | false | Enable seq-based replay for gap-free reconnection |
args | - | Standard Schema (Zod / Valibot / custom) applied to dynamic-topic factory args before the loader runs. Rejects with VALIDATION on bad input |
transform | - | (ctx, item) => item projection applied to BOTH initial-load data and every live publish. Use for select-trimming, computed fields |
coalesceBy | - | (event, data) => key per-subscriber coalesce key. Same-key publishes within the same flush tick collapse to the latest. Cannot combine with volatile |
volatile | false | Skip per-topic seq stamping. Use for ephemeral channels (cursor moves, typing indicators) where replay / gap-fill is meaningless. Cannot combine with coalesceBy |
staleAfterMs | - | If no publish lands within this window, fire the onStale hook (or onError) so the stream can refresh its initial state. Watchdog primitive for “stream went quiet” recovery |
invalidateOn | - | string \| string[] of topics; on ctx.publish(invalidatedTopic, 'invalidate', ...), the stream re-runs its init loader and re-broadcasts the result. The companion ctx.invalidate(topic) is the publish-side trigger |
onError | - | (err, ctx) => void per-stream error hook. Receives errors from init loader, transform, filter, access, and lifecycle hooks. Defaults to the global onError registry |
classOfService | - | Named load-shedding class registered via live.admission(). New subscribes shed under matching pressure. Existing subscribers unaffected. See API -> live.admission |
onSubscribe | - | (ctx, topic) => void fires when a client subscribes |
onUnsubscribe | - | (ctx, topic, remainingSubscribers) => void fires when a client disconnects. The third arg is the count of remaining subscribers on the topic AFTER this client leaves. Use === 0 to tear down upstream feeds, close database listeners, etc. |
filter / access | - | Per-connection publish filter (see Access control) |
delta | - | Delta sync config (see Delta sync) |
version | - | Schema version (see Schema evolution) |
migrate | - | Migration functions (see Schema evolution) |
Lifecycle hooks
export const presence = live.stream('room:lobby', async (ctx) => {
return db.presence.list('lobby');
}, {
merge: 'presence',
onSubscribe(ctx, topic) {
ctx.publish(topic, 'join', { key: ctx.user.id, name: ctx.user.name });
},
onUnsubscribe(ctx, topic, remainingSubscribers) {
ctx.publish(topic, 'leave', { key: ctx.user.id });
if (remainingSubscribers === 0) {
// last client out: close the upstream Kafka consumer for this room
upstream.unsubscribe(`kafka:room:lobby`);
}
}
}); The remainingSubscribers count is computed AFTER this client’s leave is applied. 0 means this was the last subscriber; the topic is effectively idle and any upstream feed wired during onSubscribe should be torn down here.
Staleness watchdog + topic-driven invalidation
export const orgFeed = live.stream(
(ctx) => `org:${ctx.user.orgId}:feed`,
async (ctx) => db.feed.forOrg(ctx.user.orgId),
{
merge: 'crud',
staleAfterMs: 30_000,
invalidateOn: ['org:settings:updated'],
onError(err, ctx) {
logger.warn({ err, topic: ctx.topic }, 'stream error');
}
}
);
// From any handler that mutates the org's settings:
export const updateSettings = live(async (ctx, patch) => {
await db.orgs.update(ctx.user.orgId, patch);
ctx.invalidate(`org:${ctx.user.orgId}:feed`); // triggers init re-run on every subscriber
}); staleAfterMs is a watchdog: if no publish lands on the topic within the window, the stream re-runs its init loader and re-broadcasts. Useful when the publish path is fragile (external feed, derived computation) and “stream went quiet” should be self-healing. invalidateOn is the same shape but triggered by a known publish event on another topic - mutations on the trigger topic invalidate the listening stream.
Reconnection
When the WebSocket reconnects, streams automatically refetch initial data and resubscribe. The store keeps showing stale data during the refetch - it does not reset to undefined.
Pagination
SSR hydration
See Client APIs - SSR Hydration.
Channels
Ephemeral pub/sub topics with no database initialization. Clients subscribe and receive live events immediately - no init function, no database query.
// src/live/typing.js
import { live } from 'svelte-realtime/server';
export const typing = live.channel('typing:lobby', { merge: 'presence' }); <script>
import { typing } from '$live/typing';
</script>
{#each $typing as user (user.key)}
<span>{user.data.name} is typing...</span>
{/each} Dynamic channels work the same way:
export const cursors = live.channel(
(ctx, docId) => 'cursors:' + docId,
{ merge: 'cursor' }
); live.channel() is essentially live.stream() without an init function. The store starts as an empty value matching the merge strategy (empty array for presence/cursor/crud, null for set) and only receives data through published events.
Derived streams
Server-side computed streams that recompute when any source topic publishes. List the source topics → provide a computation function → clients subscribe like a normal stream.
import { live } from 'svelte-realtime/server';
export const dashboardStats = live.derived(
['orders', 'inventory', 'users'],
async () => {
return {
totalOrders: await db.orders.count(),
lowStock: await db.inventory.lowStockCount(),
activeUsers: await db.users.activeCount()
};
},
{ debounce: 500 }
); On the client, derived streams work like regular streams:
<script>
import { dashboardStats } from '$live/dashboard';
</script>
<p>Orders: {$dashboardStats?.totalOrders}</p> Dynamic derived streams
When source topics depend on runtime arguments (e.g. an org ID, a room ID), pass a source factory function instead of a static array. The factory receives the same args the client passes at subscribe time:
export const orgStats = live.derived(
(orgId) => [`memberships:${orgId}`, `emails:${orgId}`, `audit:${orgId}`],
async (ctx, orgId) => {
const [members, emails, auditCount] = await Promise.all([
db.query('SELECT count(*) FROM memberships WHERE org_id = $1', [orgId]),
db.query('SELECT count(*) FROM emails WHERE org_id = $1', [orgId]),
db.query('SELECT count(*) FROM audit_log WHERE org_id = $1', [orgId])
]);
return { members, emails, auditCount };
},
{ debounce: 100 }
); On the client, dynamic derived streams are called like functions:
<script>
import { orgStats } from '$live/dashboard';
let { orgId } = $props();
</script>
<p>Members: {$orgStats(orgId)?.members}</p> Each unique set of args creates an independent instance with its own source subscriptions. Instances are created when the first subscriber connects and cleaned up when the last subscriber disconnects.
The dynamic compute function receives ctx.user from the subscribing client, so guards like if (orgId !== ctx.user.organization_id) throw new LiveError('FORBIDDEN') work the same as in regular stream handlers.
Activation
Call _activateDerived(platform) in your open hook to enable derived stream listeners:
import { _activateDerived } from 'svelte-realtime/server';
export function open(ws, { platform }) {
_activateDerived(platform);
} Without this call, derived streams still serve their initial SSR data but never receive live updates. In dev mode, a console warning is emitted when a client subscribes to a derived stream and _activateDerived has not been called.
| Option | Default | Description |
|---|---|---|
merge | 'set' | Merge strategy for the derived topic |
debounce | 0 | Debounce recomputation by this many milliseconds |
Effects
Server-side reactive side effects that fire when source topics publish. Fire-and-forget - no topic, no client subscription.
// src/live/notifications.js
import { live } from 'svelte-realtime/server';
export const orderNotifications = live.effect(['orders'], async (event, data, platform) => {
if (event === 'created') {
await email.send(data.userEmail, 'Order confirmed', templates.orderConfirm(data));
}
}); Effects are server-only. They fire whenever a matching topic publishes and cannot be subscribed to from the client. Use them for sending emails, logging analytics, syncing to external services, or any side effect that should happen in response to a topic event.
Aggregates
Real-time incremental aggregations. Reducers run on each event, maintaining O(1) state. The aggregate publishes its state to the output topic on every event → clients subscribe to the output topic as a regular stream.
// src/live/stats.js
import { live } from 'svelte-realtime/server';
export const orderStats = live.aggregate('orders', {
count: { init: () => 0, reduce: (acc, event) => event === 'created' ? acc + 1 : acc },
total: { init: () => 0, reduce: (acc, event, data) => event === 'created' ? acc + data.amount : acc },
avg: { compute: (state) => state.count > 0 ? state.total / state.count : 0 }
}, { topic: 'order-stats' }); Each field in the aggregate config can have:
init- returns the initial value for the accumulatorreduce- called on every event with(acc, event, data)→ returns the new accumulator valuecompute- a derived field computed from the current state of all other fields (not called with events directly)
For time-windowed aggregations (lifetime, tumbling, sliding) plus combineSum / combineMax / combineMin / combineCounts / combineMerge reducers, snapshot hydration, and the full options reference, see Aggregates.
Gates
Conditional stream activation. On the server, a predicate controls whether the client subscribes. On the client, .when() manages the subscription lifecycle.
// src/live/beta.js
import { live } from 'svelte-realtime/server';
export const betaFeed = live.gate(
(ctx) => ctx.user?.flags?.includes('beta'),
live.stream('beta-feed', async (ctx) => db.betaFeed.latest(50), { merge: 'latest' })
); <script>
import { betaFeed } from '$live/beta';
import { writable } from 'svelte/store';
const tabActive = writable(true);
const feed = betaFeed.when(tabActive);
</script>
{#if $feed !== undefined}
{#each $feed as item (item.id)}
<p>{item.title}</p>
{/each}
{/if} When the predicate returns false, the server responds with a graceful no-op (no error, no subscription). The client store stays undefined. .when() accepts a boolean, a Svelte store, or a getter function. When given a store, it subscribes/unsubscribes reactively as the value changes. Getter functions are evaluated once at subscribe time; for reactivity with Svelte 5 $state, wrap in $derived or pass a store.
Pipes
Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.
// src/live/notifications.js
import { live, pipe } from 'svelte-realtime/server';
export const myNotifications = pipe(
live.stream('notifications', async (ctx) => {
return db.notifications.forUser(ctx.user.id);
}, { merge: 'crud', key: 'id' }),
pipe.filter((ctx, item) => !item.dismissed),
pipe.sort('createdAt', 'desc'),
pipe.limit(20),
pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
); | Transform | Initial data | Live events |
|---|---|---|
pipe.filter(predicate) | Filters the array | Initial data only |
pipe.sort(field, dir) | Sorts the array | Initial data only |
pipe.limit(n) | Slices to N items | Initial data only |
pipe.join(field, resolver, as) | Enriches each item | Initial data only |
Piped functions preserve all stream metadata. The client receives already-transformed data. Import pipe from 'svelte-realtime/server' alongside live.
Under the hood, streams use the adapter’s pub/sub system. See svelte-adapter-uws plugins for replay, presence, and throttle plugins.
Was this page helpful?