Admission Control

Two-tier admission keeps a saturated cluster from amplifying load.

TierWhereWhat it does
Handshakeadapter WebSocketOptions.upgradeAdmissionCaps concurrent connections (maxConcurrent) and paces accepted upgrades (perTickBudget). 503 fast-fail past the cap.
Messageextensions createAdmissionControlPer-class rules consulted at message dispatch. Per-class accept / reject decision based on backpressure signals or custom predicates.

The handshake tier protects the listen socket; the message tier protects expensive code paths once a connection is open.

Setup

import { createAdmissionControl } from 'svelte-adapter-uws-extensions/admission';
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';

const rates = await createPublishRateAggregator(redis);

const admission = createAdmissionControl({
  rules: {
    'expensive-rpc': ['MEMORY', 'PUBLISH_RATE'],
    'cluster-broadcast': { clusterTopPublisher: { rates, threshold: 1000 } },
    'admin-only': (className, platform) => platform.connections < 1000
  }
});

Usage

import { createMessage, LiveError } from 'svelte-realtime/server';

export const message = createMessage({
  async beforeExecute(ws, rpcPath) {
    const className = classOf(rpcPath);
    if (!await admission.shouldAccept(className, platform)) {
      throw new LiveError('OVERLOADED', 'Server is shedding load');
    }
  }
});

Rule shapes

A rule is one of:

  • Array of pressure reasons - ['MEMORY', 'PUBLISH_RATE', 'SUBSCRIBERS']. Reject if platform.pressure.reason is any of the listed reasons.
  • Predicate - (className, platform) => boolean | Promise<boolean>. Custom logic.
  • clusterTopPublisher - { rates, threshold }. Reject if the calling user is in the top-N publishers across the cluster (use with createPublishRateAggregator).

Metrics

MetricTypeLabels
admission_accepted_totalcounterclass
admission_rejected_totalcounterclass, reason

Cluster-wide publish-rate aggregator

createPublishRateAggregator(redis, options?) exposes a cluster-wide top-publisher view that powers the clusterTopPublisher admission rule. Each instance broadcasts its own platform.pressure.topPublishers slice on a Redis pub/sub channel; every instance maintains a sliding-window view of all instances’ slices and merges them into a cluster-wide top-N. No leader election - each instance is its own aggregator. Storage cost is O(instances * topN) per instance, bounded and small.

// src/lib/server/publish-rate.js
import { redis } from './redis.js';
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';

export const aggregator = createPublishRateAggregator(redis, {
  publishInterval: 5_000,
  staleAfter: 12_000,
  topN: 20
});

// In your open hook (or any startup path with a platform reference):
export async function open(ws, { platform }) {
  await aggregator.activate(platform);
}

API

MemberDescription
instanceIdStable id for this instance, used as the from-tag on outbound slice envelopes.
topPublishersCluster-wide top publishers, merged from this instance’s fresh local slice and cached non-stale remote slices. Each entry: { topic, messagesPerSec, bytesPerSec, contributingInstances }. Sorted descending by messagesPerSec, capped at topN. Pure memory computation.
rateOf(topic)Cluster-wide messagesPerSec for a topic, or 0 if not in the merged top-N. Used by the clusterTopPublisher admission rule.
subscribersOf(topic)Cluster-wide subscriber count for a topic, summed across this instance’s live local count (from the optional subjects callback) and non-stale remote contributions. Returns 0 when no subjects is wired and no remote instance has reported the topic. The sharded bus’s bus.subscribers(topic) delegates here when an aggregator is wired.
activate(platform)Open the subscriber and start the broadcast timer. Idempotent.
deactivate()Stop the timer, drop the subscriber, clear cached slices.

Options

OptionDefaultDescription
channel'uws:pressure:rates'Redis channel for slice broadcasts. Override per tenant - see Multi-tenant deployments.
publishInterval5000How often this instance broadcasts its slice (ms).
staleAfter12000Drop a remote instance’s slice if no fresher one arrives within this window (ms). Should be at least 2 * publishInterval.
topN20Cap on per-instance slice and merged result. Bounds storage cost. Also caps the subs slice (sorted descending by count).
subjectsunsetOptional () => Array<{topic, count}> contributor for cluster-wide subscriber counts. Called fresh on every broadcast tick. When wired, the envelope grows a subs field and subscribersOf(topic) returns the merged sum. Pair with the sharded bus via subjects: () => bus.localSubjects(platform).
breakerunsetOptional circuit breaker for the publish call.
metricsunsetOptional Prometheus metrics registry.

Pairing with the sharded bus (cluster subscriber counts)

The aggregator’s subjects option is the channel for cluster-wide subscriber counts. Wire the sharded bus’s localSubjects(platform) helper as the contributor; the bus exposes a matching bus.subscribers(topic) that delegates to aggregator.subscribersOf(topic):

import { createShardedBus } from 'svelte-adapter-uws-extensions/redis/sharded-pubsub';
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';

const bus = createShardedBus(redis);
const aggregator = createPublishRateAggregator(redis, {
  subjects: () => bus.localSubjects(platform)
});

// Also pass subscribersAggregator into the bus so bus.subscribers(topic)
// returns the cluster-wide count rather than just the local one:
const busWithCluster = createShardedBus(redis, { subscribersAggregator: aggregator });

await bus.activate(platform);
await aggregator.activate(platform);

const cluster = busWithCluster.subscribers('chat:room-7');
// Local count + sum from non-stale remote instances.

Without an aggregator wired, bus.subscribers(topic) returns the local count only. Eventually-consistent within publishInterval for the remote contribution; the local read is always live. For exact counts (audit log, billing), track a Redis SET cluster-wide on subscribe / unsubscribe and SCARD it.

The unsharded createPubSubBus does not track per-topic state (it forwards every topic through a single Redis channel), so it does not expose localSubjects / subscribers. Apps that need cluster-wide subscriber counts on the unsharded bus thread their own per-topic state into the aggregator’s subjects callback.

Wire envelope

Channel: {channel}                    (default: 'uws:pressure:rates')
Payload: {
  instanceId, ts,
  slice: [{topic, messagesPerSec, bytesPerSec}, ...],
  subs?: [{topic, count}, ...]
}

Receivers merge into a per-instanceId map keyed on the broadcasting instance; entries older than staleAfter are dropped on the next merge. The subs field is omitted when no subjects callback is configured. Aggregators on either side of a version skew tolerate envelopes with or without subs (forward- and backward-compatible).

Metrics

MetricDescription
cluster_publish_rate_broadcasts_totalSlice envelopes published by this instance.
cluster_publish_rate_received_totalSlice envelopes received from sibling instances.
cluster_publish_rate_parse_errors_totalMalformed envelopes dropped on receive.
cluster_publish_rate_instance_countGauge: sibling instances contributing slices (excluding self). Useful for cluster-size monitoring.
cluster_topic_publish_rate{topic}Cluster-wide messagesPerSec, summed across instances. Registered via wireClusterPublishRateMetrics.
cluster_topic_publish_bytes{topic}Cluster-wide bytesPerSec. Same wirer.

Wire to Prometheus via wireClusterPublishRateMetrics(aggregator, metrics, { topN }). Per-instance metrics via wirePublishRateMetrics(platform, metrics). Both can be active simultaneously - local view shows hot-shard pressure, cluster view shows global capacity. See Metrics.

See also

Was this page helpful?