Aggregates - live.aggregate()
live.aggregate() is a server-side reactive aggregation primitive. It listens on a source topic, runs reducers on each event, maintains O(1) state, and publishes the rolled-up result to a derived output topic. Clients subscribe to the output topic as a regular stream.
import { live } from 'svelte-realtime/server';
export const orderStats = live.aggregate('orders', {
count: { init: () => 0, reduce: (acc, event) => event === 'created' ? acc + 1 : acc },
total: { init: () => 0, reduce: (acc, event, data) => event === 'created' ? acc + data.amount : acc },
avg: { compute: (state) => state.count > 0 ? state.total / state.count : 0 }
}, { topic: 'order-stats' }); <script>
import { orderStats } from '$live/billing';
</script>
<p>{$orderStats.count} orders / ${$orderStats.total / 100} (avg ${$orderStats.avg / 100})</p> The signature is live.aggregate(sourceTopic, reducers, options):
| Field | Type | Description |
|---|---|---|
sourceTopic | string | Topic the aggregate listens on. Every event published here passes through the reducers. |
reducers | Record<string, { init?, reduce?, compute? }> | One entry per output field. init() returns the initial value; reduce(acc, event, data) returns the new value; compute(state) derives a value from the rest of the state on each publish. |
options.topic | string | The output topic where rolled-up state is published. |
options.debounce | number | Coalesce high-frequency apply calls into one publish per N ms. |
options.snapshot | () => Promise<state> | Optional one-shot hydration on registration. |
options.windows | Record<string, WindowSpec> | Time-windowed form (see below). |
options.snapshots | Record<string, () => Promise<state>> | Per-window hydration when windows is set. |
Time windows
options.windows produces multiple windowed aggregates from one input. Each window emits to its own output topic at ${options.topic}:${windowName}.
export const requestStats = live.aggregate('requests', {
count: { init: () => 0, reduce: (s) => s + 1 }
}, {
topic: 'requests-stats',
windows: {
lifetime: { type: 'lifetime' },
today: { type: 'tumbling', period: 'daily', tz: 'America/New_York' },
last5min: { type: 'sliding', durationMs: 5 * 60_000, slideMs: 30_000 }
}
}); The windowed form returns a parent function that throws if called directly; subscribe to its per-window children via the Vite plugin’s generated stubs:
<script>
import { requestStats } from '$live/metrics';
</script>
<p>Lifetime: {$requestStats.lifetime?.count}</p>
<p>Today: {$requestStats.today?.count}</p>
<p>5 min: {$requestStats.last5min?.count}</p> Window types
| Type | Description |
|---|---|
lifetime | Single bucket for the process lifetime. Equivalent to no windows config. |
tumbling | Discrete fixed-period buckets. New bucket starts at the period boundary. Use period: 'minute' \| 'hour' \| 'daily' \| 'monthly' with optional IANA tz, or durationMs + anchor. |
sliding | Hopping window: durationMs total, slideMs between hops. State is summed/merged across Math.ceil(durationMs / slideMs) hop-buckets via a combine reducer. |
Boundaries for named periods use Intl.DateTimeFormat so DST transitions and timezone offsets behave correctly.
Combine reducers for sliding windows
For sliding windows, the framework needs to fold across hop-buckets. combine(...buckets) reduces multiple bucket states into one. The bundled built-ins cover the common cases:
import {
live,
combineSum, // numbers add
combineMax, // pairwise max
combineMin, // pairwise min
combineCounts, // { [key]: count } merges by sum
combineMerge // shallow object merge, last value wins
} from 'svelte-realtime/server';
export const topPaths = live.aggregate('requests:by-path', {
byPath: {
init: () => ({}),
reduce: (acc, _event, { path }) => ({ ...acc, [path]: (acc[path] ?? 0) + 1 })
}
}, {
topic: 'top-paths',
windows: {
last5min: {
type: 'sliding',
durationMs: 5 * 60_000,
slideMs: 30_000,
combine: { byPath: combineCounts }
}
}
}); Hand-roll a per-field combine for non-trivial reducers:
combine: { byPath: (...buckets) => buckets.reduce((acc, b) => /* merge logic */, {}) } Per-window options
Each window entry can carry its own debounce (overrides the top-level), snapshot for hydration, or combine map for sliding folds.
Snapshot hydration
export const orderStats = live.aggregate('orders', reducers, {
topic: 'order-stats',
snapshot: () => redis.hgetall('order-stats')
}); snapshot runs once at registration. The returned object is hydrated into state via a helper that copies only own enumerable keys; __proto__, constructor, and prototype are skipped to defend against prototype-pollution attacks via untrusted snapshot payloads (Redis cache, JSON from another service).
For windowed aggregates, use snapshots: { [windowName]: () => Promise<state> } instead of a single snapshot.
Capacity
| Constant | Default | Description |
|---|---|---|
MAX_AGGREGATE_BUCKETS | 1000 | Cap on a single sliding window’s hop-bucket count (durationMs / slideMs). Module-load validation refuses to register oversized rings. |
import { MAX_AGGREGATE_BUCKETS } from 'svelte-realtime/server'; A 5-minute sliding window with 30 ms slides would request 10,000 buckets and fail at registration. Pick a slide period long enough to keep the bucket count comfortably under 1000.
Debounce
debounce: <ms> coalesces high-frequency reducer calls into one publish per N ms. Default 0 (every event triggers a publish).
live.aggregate('positions', reducers, {
topic: 'position-stats',
debounce: 16 // 60 Hz publish rate cap
}); Was this page helpful?