Connection Registry

createConnectionRegistry(redis, options) is the cluster-wide counterpart to platform.request / platform.send / platform.sendCoalesced / platform.sendTo. It tracks userId -> {instanceId, sessionId, ts} so server-initiated traffic finds the right instance.

live.push(target, ...) and live.notify(target, ...) consume this registry via live.configurePush({ remoteRegistry }).

Setup

import { createRedisClient } from 'svelte-adapter-uws-extensions/redis';
import { createConnectionRegistry } from 'svelte-adapter-uws-extensions/redis/registry';
import { live } from 'svelte-realtime/server';

const redis = createRedisClient();
const registry = await createConnectionRegistry(redis, {
  identify: (ws) => ws.getUserData()?.userId
});

// hooks.ws.js
export async function init({ platform }) {
  live.configurePush({ remoteRegistry: registry });
}

identify(ws) is the only required option - given a WebSocket, return a stable user-scoped id (userId, tenantUserId, etc.). The registry tracks last-seen-connection-wins per user.

Usage

import { live } from 'svelte-realtime/server';

const reply = await live.push(targetUserId, 'confirm-action', { action: 'delete' });
await live.notify(targetUserId, 'invalidate', { resource: 'orders' });

The realtime wrappers route through registry.request / registry.send under the hood. Use the lower-level methods directly if you need cross-instance targeting outside of live.push / live.notify:

const reply = await registry.request(targetUserId, 'confirm', payload, { timeoutMs: 5000 });
await registry.send(targetUserId, 'topic', 'event', data);
await registry.sendCoalesced(targetUserId, { key, topic, event, data });
const matched = await registry.sendTo({ tenantId: 't1' }, 'topic', 'event', data);

Self-targeting short-circuits to the local platform; cross-instance hops go through {prefix}__push:{instanceId} channel.

Options

OptionDefaultDescription
identifyrequired(ws) => userId \| null. Anonymous connections (returning null/undefined) are skipped - not addressable through the registry.
attributesunset(ws) => Record<string, string \| number \| boolean>. Required for sendTo(...). Captures per-user attributes at registration time. Numbers and booleans coerced to strings for index-key consistency; nested objects, arrays, and null values dropped (shallow only).
keyPrefix''Prefix prepended to all registry keys and channels. Stacks with the underlying client’s keyPrefix.
ttl90Expiry on registry entries in seconds. Should be > heartbeat * 3 so a missed beat does not drop a live user.
heartbeat30000TTL refresh interval in ms. Each tick EXPIREs every locally-owned entry.
requestTimeoutMs5000Default timeout for request(...). Overridable per-call via options.timeoutMs.
breakerunsetOptional circuit breaker for Redis ops.
metricsunsetOptional Prometheus metrics registry.

API

MethodDescription
lookup(userId)Resolve a userId to its entry {instanceId, sessionId, ts} or null.
request(target, event, data?, opts?)Cluster-routed request/reply. Resolves with the reply.
send(target, topic, event, data?)Cluster-routed platform.send counterpart. Fire-and-forget.
sendCoalesced(target, { key, topic, event, data })Cluster-routed coalesce-by-key send. Latest-value wins per (connection, key).
sendTo(criteria, topic, event, data?)Attribute-targeted broadcast across the cluster. Requires attributes option.
size()Count of users registered to THIS instance (scrape-time, local view).
instanceIdStable id for this instance, also the name of its push channel.
hooks.open / hooks.closeReady-made WebSocket hooks.
destroy()Stop the heartbeat timer and Redis subscriber.

Attribute-targeted broadcast: sendTo

sendTo(criteria, topic, event, data) is the cluster-routed counterpart to platform.sendTo(filter, ...) - except it matches on attribute values, not a filter function. The attributes(ws) option captures per-user attributes at registration time and indexes them in memory on every instance; sendTo resolves matches against the local index, groups by owning instance, and fires one envelope per owning instance.

import { createConnectionRegistry } from 'svelte-adapter-uws-extensions/redis/registry';

const registry = await createConnectionRegistry(redis, {
  identify: (ws) => ws.getUserData()?.userId,
  attributes: (ws) => {
    const ud = ws.getUserData();
    return { role: ud.role, region: ud.region };
  }
});

// Broadcast to every connection where attributes.role === 'admin':
await registry.sendTo({ role: 'admin' }, 'alerts', 'warning', { message: 'High load' });

// Compound match - AND across keys:
await registry.sendTo({ role: 'admin', region: 'eu' }, 'audit', 'created', payload);

platform.sendTo accepts a filter function; the cluster shape is shallow equality only: one literal value per attribute key, AND across keys. Functions do not serialize across instances, so the filter-function escape hatch is deliberately not lifted here. No regex, no array containment, no nested-object queries. Apps that need richer queries should publish to a dedicated topic and route their own broadcasts.

How it works. Each instance maintains an in-memory secondary index Map<attrKey, Map<attrValue, Set<userId>>> plus a shadow userId -> attrs map. Updates ride a shared {prefix}__registry-events Redis pub/sub channel: every successful hooks.open publishes {type:'open', userId, instanceId, attrs}, every close publishes {type:'close', ...}. A new instance bootstraps by SCANing existing {prefix}conns:* hashes and HGETALL-ing each to populate from live state. Subscribe-first / SCAN-second keeps the bootstrap race-tolerant. Receivers re-resolve matches against their own authoritative local index before calling platform.send, so one round of sender-index staleness from a fast user migration is tolerated.

sendTo is fire-and-forget. No reply, no per-target outcome. The push_sendto_total{result} counter records sender-side outcomes:

resultMeaning
okAt least one match resolved; envelopes published successfully (or only self-matches delivered locally).
emptyNo matches resolved by the local index. The call no-ops.
errorAt least one Redis publish failed; partial delivery still occurred for the successful publishes.

Eventual consistency

  • Mid-flight migration. A user reconnecting on a different instance between the sender’s index lookup and the receive can produce a single best-effort missed delivery while the registry-events channel propagates the migration.
  • Pub/sub disconnect. If the registry’s subscriber connection drops, in-memory indexes stop updating; the Redis hash’s attrs field stays authoritative for lookup(userId). The next ensureSubscriber() (e.g., on next hooks.open) re-bootstraps via SCAN.
  • Topics outside the bus. Only topics published via platform.send / platform.publish reach receiving connections; sendTo does NOT subscribe target users to the topic, so a matched user with no active subscription still receives the event (per-connection, not per-topic).

For exact targeting (audit log, billing, transactional broadcasts), use request(...) per userId or fan out via topic subscribers with the bus.

Edge cases

  • User reconnects to a different instance mid-request. The origin’s pending entry waits on the OLD instance’s reply channel. The user’s new connection won’t reply there; the request times out. Late replies from the old instance after teardown silently drop (push_late_replies_total increments).
  • Owning instance crashes between request publish and local platform.request. Same shape - request times out. The Redis entry survives until the TTL expires (sliding heartbeat cleared by the dead instance), after which request(...) sees result="offline".
  • Self-targeting. When the origin instance owns the user, request / send / sendCoalesced short-circuit to a local platform.* without round-tripping Redis. One conditional in the dispatcher, not a special case at the API surface.
  • Anonymous connections. identify(ws) returning null / undefined makes the open / close hooks no-op.

Metrics

MetricTypeLabels
push_requests_totalcounterresult
push_reply_latency_mshistogram-
push_registry_sizegauge-
push_late_replies_totalcounter-
push_sends_totalcounterresult
push_coalesced_totalcounterresult
push_sendto_totalcounterresult

See also

Was this page helpful?