Aggregates - live.aggregate()

live.aggregate() is a server-side reactive aggregation primitive. It listens on a source topic, runs reducers on each event, maintains O(1) state, and publishes the rolled-up result to a derived output topic. Clients subscribe to the output topic as a regular stream.

import { live } from 'svelte-realtime/server';

export const orderStats = live.aggregate('orders', {
  count: { init: () => 0, reduce: (acc, event)       => event === 'created' ? acc + 1 : acc },
  total: { init: () => 0, reduce: (acc, event, data) => event === 'created' ? acc + data.amount : acc },
  avg:   { compute: (state) => state.count > 0 ? state.total / state.count : 0 }
}, { topic: 'order-stats' });
<script>
  import { orderStats } from '$live/billing';
</script>
<p>{$orderStats.count} orders / ${$orderStats.total / 100} (avg ${$orderStats.avg / 100})</p>

The signature is live.aggregate(sourceTopic, reducers, options):

FieldTypeDescription
sourceTopicstringTopic the aggregate listens on. Every event published here passes through the reducers.
reducersRecord<string, { init?, reduce?, compute? }>One entry per output field. init() returns the initial value; reduce(acc, event, data) returns the new value; compute(state) derives a value from the rest of the state on each publish.
options.topicstringThe output topic where rolled-up state is published.
options.debouncenumberCoalesce high-frequency apply calls into one publish per N ms.
options.snapshot() => Promise<state>Optional one-shot hydration on registration.
options.windowsRecord<string, WindowSpec>Time-windowed form (see below).
options.snapshotsRecord<string, () => Promise<state>>Per-window hydration when windows is set.

Time windows

options.windows produces multiple windowed aggregates from one input. Each window emits to its own output topic at ${options.topic}:${windowName}.

export const requestStats = live.aggregate('requests', {
  count: { init: () => 0, reduce: (s) => s + 1 }
}, {
  topic: 'requests-stats',
  windows: {
    lifetime: { type: 'lifetime' },
    today:    { type: 'tumbling', period: 'daily', tz: 'America/New_York' },
    last5min: { type: 'sliding', durationMs: 5 * 60_000, slideMs: 30_000 }
  }
});

The windowed form returns a parent function that throws if called directly; subscribe to its per-window children via the Vite plugin’s generated stubs:

<script>
  import { requestStats } from '$live/metrics';
</script>
<p>Lifetime: {$requestStats.lifetime?.count}</p>
<p>Today:    {$requestStats.today?.count}</p>
<p>5 min:    {$requestStats.last5min?.count}</p>

Window types

TypeDescription
lifetimeSingle bucket for the process lifetime. Equivalent to no windows config.
tumblingDiscrete fixed-period buckets. New bucket starts at the period boundary. Use period: 'minute' \| 'hour' \| 'daily' \| 'monthly' with optional IANA tz, or durationMs + anchor.
slidingHopping window: durationMs total, slideMs between hops. State is summed/merged across Math.ceil(durationMs / slideMs) hop-buckets via a combine reducer.

Boundaries for named periods use Intl.DateTimeFormat so DST transitions and timezone offsets behave correctly.

Combine reducers for sliding windows

For sliding windows, the framework needs to fold across hop-buckets. combine(...buckets) reduces multiple bucket states into one. The bundled built-ins cover the common cases:

import {
  live,
  combineSum,    // numbers add
  combineMax,    // pairwise max
  combineMin,    // pairwise min
  combineCounts, // { [key]: count } merges by sum
  combineMerge   // shallow object merge, last value wins
} from 'svelte-realtime/server';

export const topPaths = live.aggregate('requests:by-path', {
  byPath: {
    init: () => ({}),
    reduce: (acc, _event, { path }) => ({ ...acc, [path]: (acc[path] ?? 0) + 1 })
  }
}, {
  topic: 'top-paths',
  windows: {
    last5min: {
      type: 'sliding',
      durationMs: 5 * 60_000,
      slideMs: 30_000,
      combine: { byPath: combineCounts }
    }
  }
});

Hand-roll a per-field combine for non-trivial reducers:

combine: { byPath: (...buckets) => buckets.reduce((acc, b) => /* merge logic */, {}) }

Per-window options

Each window entry can carry its own debounce (overrides the top-level), snapshot for hydration, or combine map for sliding folds.

Snapshot hydration

export const orderStats = live.aggregate('orders', reducers, {
  topic: 'order-stats',
  snapshot: () => redis.hgetall('order-stats')
});

snapshot runs once at registration. The returned object is hydrated into state via a helper that copies only own enumerable keys; __proto__, constructor, and prototype are skipped to defend against prototype-pollution attacks via untrusted snapshot payloads (Redis cache, JSON from another service).

For windowed aggregates, use snapshots: { [windowName]: () => Promise<state> } instead of a single snapshot.

Capacity

ConstantDefaultDescription
MAX_AGGREGATE_BUCKETS1000Cap on a single sliding window’s hop-bucket count (durationMs / slideMs). Module-load validation refuses to register oversized rings.
import { MAX_AGGREGATE_BUCKETS } from 'svelte-realtime/server';

A 5-minute sliding window with 30 ms slides would request 10,000 buckets and fail at registration. Pick a slide period long enough to keep the bucket count comfortably under 1000.

Debounce

debounce: <ms> coalesces high-frequency reducer calls into one publish per N ms. Default 0 (every event triggers a publish).

live.aggregate('positions', reducers, {
  topic: 'position-stats',
  debounce: 16  // 60 Hz publish rate cap
});

Was this page helpful?