Resilience & Testing
Circuit Breaker
Prevents thundering herd when a backend goes down. When Redis or Postgres becomes unreachable, every extension that uses the breaker fails fast instead of queueing up timeouts, and fire-and-forget operations (heartbeats, relay flushes, cursor broadcasts) are skipped entirely.
Three states:
- healthy - everything works, requests go through
- broken - too many failures, requests fail fast via
CircuitBrokenError - probing - one request is allowed through to test if the backend is back
Setup
// src/lib/server/breaker.js
import { createCircuitBreaker } from 'svelte-adapter-uws-extensions/breaker';
export const breaker = createCircuitBreaker({
failureThreshold: 5,
resetTimeout: 30000,
onStateChange: (from, to) => console.log(`circuit: ${from} -> ${to}`)
}); Pass the same breaker to all extensions that share a backend:
import { breaker } from './breaker.js';
export const bus = createPubSubBus(redis, { breaker });
export const presence = createPresence(redis, { breaker, key: 'id' });
export const replay = createReplay(redis, { breaker });
export const limiter = createRateLimit(redis, { points: 10, interval: 1000, breaker }); Failures from any extension contribute to the same breaker. When one trips it, all others fail fast.
Options
| Option | Default | Description |
|---|---|---|
failureThreshold | 5 | Consecutive failures before breaking |
resetTimeout | 30000 | Ms before transitioning from broken to probing |
onStateChange | - | Called on state transitions: (from, to) => void |
API
| Method / Property | Description |
|---|---|
breaker.state | 'healthy', 'broken', or 'probing' |
breaker.isHealthy | true only when state is 'healthy' |
breaker.failures | Current consecutive failure count |
breaker.guard() | Throws CircuitBrokenError if the circuit is broken |
breaker.success() | Record a successful operation |
breaker.failure() | Record a failed operation |
breaker.subscribe(handler) | Multi-listener state-change subscription. handler(from, to). Returns an unsubscribe function. Listener errors do not break other listeners. |
breaker.reset() | Force back to healthy |
breaker.destroy() | Clear internal timers |
breaker.subscribe(handler) is multi-listener since 0.5; the constructor onStateChange still works for the single-listener case. The pubsub bus auto-emits degraded / recovered system-channel events when it shares a breaker - see Redis pub/sub.
How extensions use it
Awaited operations (join, consume, publish) call guard() before the Redis/Postgres call, success() after, and failure() in the catch block. When the circuit is broken, guard() throws CircuitBrokenError and the operation never reaches the backend.
Fire-and-forget operations (heartbeat refresh, relay flush, cursor broadcast) check isHealthy and skip entirely when the circuit is not healthy. This prevents piling up commands on a dead connection.
Per-extension degradation behavior
| Extension | Awaited operations | Fire-and-forget operations |
|---|---|---|
| Pub/sub bus | wrap().publish() queues to local platform only; relay to Redis is skipped silently | Microtask relay flush is skipped entirely |
| Presence | join() / leave() throw CircuitBrokenError | Heartbeat refresh and stale cleanup are skipped |
| Replay buffer | publish() / replay() / seq() throw CircuitBrokenError | - |
| Rate limiting | consume() throws CircuitBrokenError (fail-closed - requests are blocked, not allowed through) | - |
| Broadcast groups | join() / leave() throw CircuitBrokenError | Heartbeat refresh is skipped |
| Cursor | - | Hash writes and cross-instance relay are skipped; local throttle continues |
| LISTEN/NOTIFY | activate() throws; auto-reconnect retries on its own interval | - |
Note that rate limiting is fail-closed: when the breaker is open, requests are blocked, not allowed through. This is a deliberate safety choice.
Error handling
import { CircuitBrokenError } from 'svelte-adapter-uws-extensions/breaker';
try {
await replay.publish(platform, 'chat', 'msg', data);
} catch (err) {
if (err instanceof CircuitBrokenError) {
// Backend is down -- degrade gracefully
platform.publish('chat', 'msg', data); // local-only delivery
}
} Notifying clients of degradation
When Redis pub/sub fails, live streams on other replicas stop receiving updates. Connected clients keep showing stale data with no indication that the stream is degraded. Use the onStateChange callback to publish a system-level event so clients can surface this:
import { createCircuitBreaker } from 'svelte-adapter-uws-extensions/breaker';
let distributed; // the bus.wrap(platform) reference
export const breaker = createCircuitBreaker({
failureThreshold: 5,
resetTimeout: 30000,
onStateChange: (from, to) => {
if (!distributed) return;
if (to === 'broken') {
// Local-only publish - Redis is down, but local clients still receive it
distributed.publish('__system', 'degraded', { reason: 'backend unavailable' });
} else if (from === 'broken' && to === 'healthy') {
distributed.publish('__system', 'recovered', null);
}
}
}); On the client side, subscribe to __system and show a banner when the degraded event fires. On recovered, dismiss the banner and refetch stale data.
Graceful Shutdown
All clients listen for the sveltekit:shutdown event and disconnect cleanly by default. You can disable this with autoShutdown: false and manage the lifecycle yourself.
// Manual shutdown
await redis.quit();
await pg.end();
presence.destroy();
cursors.destroy();
lobby.destroy();
breaker.destroy(); Call destroy() on any extension that runs background timers (presence heartbeats, cursor throttle timers, circuit breaker reset timers, Postgres cleanup intervals). The client quit()/end() methods close the underlying connections.
Testing
Tests use in-memory mocks for Redis and Postgres - no running services needed.
npm test Since the extensions match the core plugin APIs, you can swap between in-memory plugins (for tests and single-instance dev) and Redis/Postgres extensions (for production) by changing the import path. Structure your code so the extension instance is created in a single file (src/lib/server/replay.js, etc.) and imported everywhere else. To switch backends, change that one file.
Testing your own extension-consuming code
The svelte-adapter-uws-extensions/testing entry point exports the same in-memory mocks used by the extensions’ own test suite. Use them to test your code without running Redis or Postgres:
import { mockRedisClient, mockPlatform, mockWs } from 'svelte-adapter-uws-extensions/testing';
import { createPresence } from 'svelte-adapter-uws-extensions/redis/presence';
import { createRateLimit } from 'svelte-adapter-uws-extensions/redis/ratelimit';
import { describe, it, expect } from 'vitest';
describe('presence', () => {
it('tracks users across topics', async () => {
const client = mockRedisClient();
const platform = mockPlatform();
const presence = createPresence(client, { key: 'id' });
const ws = mockWs({ id: 'user-1', name: 'Alice' });
await presence.join(ws, 'room:lobby', platform);
expect(await presence.count('room:lobby')).toBe(1);
expect(platform.published.some(p => p.event === 'join')).toBe(true);
presence.destroy();
});
});
describe('rate limiting', () => {
it('blocks after exhausting points', async () => {
const client = mockRedisClient();
const limiter = createRateLimit(client, { points: 3, interval: 10000 });
const ws = mockWs({ remoteAddress: '1.2.3.4' });
for (let i = 0; i < 3; i++) {
expect((await limiter.consume(ws)).allowed).toBe(true);
}
expect((await limiter.consume(ws)).allowed).toBe(false);
});
}); Available mocks
| Export | What it mocks | Supports |
|---|---|---|
mockRedisClient(prefix?) | createRedisClient() | Strings, hashes, sorted sets, pub/sub, pipelines, scan, Lua eval for all extension scripts |
mockPlatform() | Platform API | publish, send, sendCoalesced, sendTo, batch, topic, request, subscribe, unsubscribe, checkSubscribe, publishBatched, requestId, pressure, onPressure, onPublishRate, maxPayloadLength, bufferedAmount. Records all calls in .published, .sent, etc. |
mockWs(userData?) | uWS WebSocket | subscribe(), unsubscribe(), getUserData(), getBufferedAmount(), close() |
mockPgClient() | createPgClient() | SQL parsing for replay buffer operations, sequence counters |
PLATFORM_KEYS | constant | Single source of truth for the Platform members the mock and bus.wrap() are expected to expose. Use in custom mocks to assert parity. |
mockPlatform() includes _emitPublishRate(events) for tests that need to drive onPublishRate callbacks synchronously.
The circuit breaker (createCircuitBreaker()) is pure logic with no I/O — use it directly in tests, no mock needed.
Adapter wire-shape helpers
svelte-adapter-uws-extensions/testing re-exports a curated set of wire-protocol helpers from svelte-adapter-uws/testing (esc, completeEnvelope, wrapBatchEnvelope, isValidWireTopic, createScopedTopic, collapseByCoalesceKey, resolveRequestId, createChaosState, plus userData slot constants WS_SUBSCRIPTIONS, WS_COALESCED, WS_SESSION_ID, WS_PENDING_REQUESTS, WS_STATS, WS_PLATFORM, WS_CAPS, WS_REQUEST_ID_KEY).
Capacity caps for tests
Every internal Map / Set / queue of factory-or-module-level scope declares an explicit upper bound and a documented saturation behavior, so a runaway publisher, a subscribe-in-loop bug, or a topic-cardinality leak cannot exhaust process memory silently. Mirrors the adapter’s capacity-cap pattern, scaled one tier up: where the adapter caps per-connection at 1M, extensions cap per-instance at 10M (an instance can hold ~1M concurrent uWS connections with ~10 entries of state per connection on average), and cluster-wide warn-only caps land at 100M.
Saturation behavior is matched to each data structure’s contract:
- State the protocol depends on (
userToInstanceforsendTorouting, secondary-index buckets, cluster-wide user index): warn-only. Eviction would corrupt routing or matching, so a single structuredconsole.warnfires the first time the cap is crossed and the index keeps growing. - State bounded by the caller (registry pending requests, presence ws, cursor ws, sharded bus topics, groups local members): reject new - the adding caller surfaces an explicit error and the saturated entry is not added.
- Per-tick microtask batches (sharded bus relay, pubsub relay): warn-only - the batch drains every microtask, so reaching the cap in one tick means a synchronous burst leak that the warn surfaces without dropping in-flight publishes.
Caps live as named constants in shared/caps.js and are importable for tests that need to assert saturation behavior:
import {
MAX_REGISTRY_SESSIONS_PER_INSTANCE,
MAX_BREAKER_LISTENERS,
MAX_AGGREGATOR_REMOTE_INSTANCES
} from 'svelte-adapter-uws-extensions'; | Cap | Default | Saturation | Notes |
|---|---|---|---|
MAX_REGISTRY_SESSIONS_PER_INSTANCE | 10,000,000 | reject new | hooks.open skips the registration past the cap |
MAX_REGISTRY_PENDING_REQUESTS | 10,000,000 | reject new | request(...) rejects with “pending requests exceeded” |
MAX_REGISTRY_USER_INDEX | 100,000,000 | warn-only | cluster-wide; eviction would mis-route sendTo |
MAX_REGISTRY_INDEX_VALUES_PER_KEY | 10,000,000 | warn-only | per-attribute-key bucket in the secondary index |
MAX_SHARDED_BUS_TOPICS | 10,000,000 | reject new | follow / followBatch reject distinct-new past the cap |
MAX_SHARDED_BUS_BATCH_CHANNELS_PER_TICK | 1,000,000 | warn-only | per-microtask outbound batch |
MAX_PUBSUB_RELAY_BATCH_PER_TICK | 1,000,000 | warn-only | per-microtask outbound batch |
MAX_PRESENCE_WS | 10,000,000 | reject new | per-instance ws joins |
MAX_PRESENCE_TOPICS | 10,000,000 | reject new | per-instance topic count |
MAX_CURSOR_WS | 10,000,000 | reject new | per-instance ws cursor activity |
MAX_CURSOR_TOPICS | 10,000,000 | reject new | per-instance cursor topic count |
MAX_GROUPS_LOCAL_MEMBERS | 10,000,000 | reject new | treated as “group full” |
MAX_TASK_HANDLERS | 10,000 | reject new | bootstrap-time register(name, handler) calls |
MAX_REDIS_DUPLICATES_PER_CLIENT | 1,000 | warn-only | duplicate ioredis connections per client wrapper |
MAX_AGGREGATOR_REMOTE_INSTANCES | 10,000 | warn-only | sibling instances on the publish-rate aggregator |
MAX_BREAKER_LISTENERS | 10,000 | reject new | listeners on a single breaker |
Caps are not currently configurable per-instance; if a real workload needs a tighter or looser bound for a specific module, raise an issue and we’ll add a per-factory option. Aggregate-memory protection against a 10M-connection DoS still belongs to the adapter’s upgradeAdmission.maxConcurrent; per-instance caps are not the right place for that.
Was this page helpful?