Admission Control
Two-tier admission keeps a saturated cluster from amplifying load.
| Tier | Where | What it does |
|---|---|---|
| Handshake | adapter WebSocketOptions.upgradeAdmission | Caps concurrent connections (maxConcurrent) and paces accepted upgrades (perTickBudget). 503 fast-fail past the cap. |
| Message | extensions createAdmissionControl | Per-class rules consulted at message dispatch. Per-class accept / reject decision based on backpressure signals or custom predicates. |
The handshake tier protects the listen socket; the message tier protects expensive code paths once a connection is open.
Setup
import { createAdmissionControl } from 'svelte-adapter-uws-extensions/admission';
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';
const rates = await createPublishRateAggregator(redis);
const admission = createAdmissionControl({
rules: {
'expensive-rpc': ['MEMORY', 'PUBLISH_RATE'],
'cluster-broadcast': { clusterTopPublisher: { rates, threshold: 1000 } },
'admin-only': (className, platform) => platform.connections < 1000
}
}); Usage
import { createMessage, LiveError } from 'svelte-realtime/server';
export const message = createMessage({
async beforeExecute(ws, rpcPath) {
const className = classOf(rpcPath);
if (!await admission.shouldAccept(className, platform)) {
throw new LiveError('OVERLOADED', 'Server is shedding load');
}
}
}); Rule shapes
A rule is one of:
- Array of pressure reasons -
['MEMORY', 'PUBLISH_RATE', 'SUBSCRIBERS']. Reject ifplatform.pressure.reasonis any of the listed reasons. - Predicate -
(className, platform) => boolean | Promise<boolean>. Custom logic. clusterTopPublisher-{ rates, threshold }. Reject if the calling user is in the top-N publishers across the cluster (use withcreatePublishRateAggregator).
Metrics
| Metric | Type | Labels |
|---|---|---|
admission_accepted_total | counter | class |
admission_rejected_total | counter | class, reason |
Cluster-wide publish-rate aggregator
createPublishRateAggregator(redis, options?) exposes a cluster-wide top-publisher view that powers the clusterTopPublisher admission rule. Each instance broadcasts its own platform.pressure.topPublishers slice on a Redis pub/sub channel; every instance maintains a sliding-window view of all instances’ slices and merges them into a cluster-wide top-N. No leader election - each instance is its own aggregator. Storage cost is O(instances * topN) per instance, bounded and small.
// src/lib/server/publish-rate.js
import { redis } from './redis.js';
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';
export const aggregator = createPublishRateAggregator(redis, {
publishInterval: 5_000,
staleAfter: 12_000,
topN: 20
});
// In your open hook (or any startup path with a platform reference):
export async function open(ws, { platform }) {
await aggregator.activate(platform);
} API
| Member | Description |
|---|---|
instanceId | Stable id for this instance, used as the from-tag on outbound slice envelopes. |
topPublishers | Cluster-wide top publishers, merged from this instance’s fresh local slice and cached non-stale remote slices. Each entry: { topic, messagesPerSec, bytesPerSec, contributingInstances }. Sorted descending by messagesPerSec, capped at topN. Pure memory computation. |
rateOf(topic) | Cluster-wide messagesPerSec for a topic, or 0 if not in the merged top-N. Used by the clusterTopPublisher admission rule. |
subscribersOf(topic) | Cluster-wide subscriber count for a topic, summed across this instance’s live local count (from the optional subjects callback) and non-stale remote contributions. Returns 0 when no subjects is wired and no remote instance has reported the topic. The sharded bus’s bus.subscribers(topic) delegates here when an aggregator is wired. |
activate(platform) | Open the subscriber and start the broadcast timer. Idempotent. |
deactivate() | Stop the timer, drop the subscriber, clear cached slices. |
Options
| Option | Default | Description |
|---|---|---|
channel | 'uws:pressure:rates' | Redis channel for slice broadcasts. Override per tenant - see Multi-tenant deployments. |
publishInterval | 5000 | How often this instance broadcasts its slice (ms). |
staleAfter | 12000 | Drop a remote instance’s slice if no fresher one arrives within this window (ms). Should be at least 2 * publishInterval. |
topN | 20 | Cap on per-instance slice and merged result. Bounds storage cost. Also caps the subs slice (sorted descending by count). |
subjects | unset | Optional () => Array<{topic, count}> contributor for cluster-wide subscriber counts. Called fresh on every broadcast tick. When wired, the envelope grows a subs field and subscribersOf(topic) returns the merged sum. Pair with the sharded bus via subjects: () => bus.localSubjects(platform). |
breaker | unset | Optional circuit breaker for the publish call. |
metrics | unset | Optional Prometheus metrics registry. |
Pairing with the sharded bus (cluster subscriber counts)
The aggregator’s subjects option is the channel for cluster-wide subscriber counts. Wire the sharded bus’s localSubjects(platform) helper as the contributor; the bus exposes a matching bus.subscribers(topic) that delegates to aggregator.subscribersOf(topic):
import { createShardedBus } from 'svelte-adapter-uws-extensions/redis/sharded-pubsub';
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';
const bus = createShardedBus(redis);
const aggregator = createPublishRateAggregator(redis, {
subjects: () => bus.localSubjects(platform)
});
// Also pass subscribersAggregator into the bus so bus.subscribers(topic)
// returns the cluster-wide count rather than just the local one:
const busWithCluster = createShardedBus(redis, { subscribersAggregator: aggregator });
await bus.activate(platform);
await aggregator.activate(platform);
const cluster = busWithCluster.subscribers('chat:room-7');
// Local count + sum from non-stale remote instances. Without an aggregator wired, bus.subscribers(topic) returns the local count only. Eventually-consistent within publishInterval for the remote contribution; the local read is always live. For exact counts (audit log, billing), track a Redis SET cluster-wide on subscribe / unsubscribe and SCARD it.
The unsharded createPubSubBus does not track per-topic state (it forwards every topic through a single Redis channel), so it does not expose localSubjects / subscribers. Apps that need cluster-wide subscriber counts on the unsharded bus thread their own per-topic state into the aggregator’s subjects callback.
Wire envelope
Channel: {channel} (default: 'uws:pressure:rates')
Payload: {
instanceId, ts,
slice: [{topic, messagesPerSec, bytesPerSec}, ...],
subs?: [{topic, count}, ...]
} Receivers merge into a per-instanceId map keyed on the broadcasting instance; entries older than staleAfter are dropped on the next merge. The subs field is omitted when no subjects callback is configured. Aggregators on either side of a version skew tolerate envelopes with or without subs (forward- and backward-compatible).
Metrics
| Metric | Description |
|---|---|
cluster_publish_rate_broadcasts_total | Slice envelopes published by this instance. |
cluster_publish_rate_received_total | Slice envelopes received from sibling instances. |
cluster_publish_rate_parse_errors_total | Malformed envelopes dropped on receive. |
cluster_publish_rate_instance_count | Gauge: sibling instances contributing slices (excluding self). Useful for cluster-size monitoring. |
cluster_topic_publish_rate{topic} | Cluster-wide messagesPerSec, summed across instances. Registered via wireClusterPublishRateMetrics. |
cluster_topic_publish_bytes{topic} | Cluster-wide bytesPerSec. Same wirer. |
Wire to Prometheus via wireClusterPublishRateMetrics(aggregator, metrics, { topN }). Per-instance metrics via wirePublishRateMetrics(platform, metrics). Both can be active simultaneously - local view shows hot-shard pressure, cluster view shows global capacity. See Metrics.
See also
- Adapter
upgradeAdmission- handshake-tier cap. - Adapter
platform.pressure- per-worker backpressure source.
Was this page helpful?