Replay Buffer

Same API as the core createReplay plugin, but backed by Redis sorted sets. Messages survive restarts and are shared across instances.

Sequence numbers are incremented atomically via a Lua script (INCR + ZADD + trim in a single EVAL), so concurrent publishes from multiple instances produce strictly ordered, gap-free sequences per topic. When the buffer exceeds size, the oldest entries are removed inside the same Lua script - no second round trip required.

When a client requests replay, the buffer checks whether the client’s last-seen sequence is older than the oldest buffered entry. If it is (the buffer was trimmed past the client’s position), a truncated event fires on __replay:{topic} before any msg events, so the client knows it missed messages and can do a full reload. This also fires when the buffer is completely empty but the sequence counter has advanced past the client’s position (e.g. all entries expired via TTL).

When to use over the built-in plugin: The core replay buffer lives in process memory - it is lost on restart and not shared between instances. Use the Redis version when you need message history that survives deploys or is consistent across a cluster.

Setup

// src/lib/server/replay.js
import { redis } from './redis.js';
import { createReplay } from 'svelte-adapter-uws-extensions/redis/replay';

export const replay = createReplay(redis, {
  size: 500,
  ttl: 3600  // expire after 1 hour
});

Usage

// In a form action or API route
export const actions = {
  send: async ({ request, platform }) => {
    const data = Object.fromEntries(await request.formData());
    const msg = await db.createMessage(data);
    await replay.publish(platform, 'chat', 'created', msg);
  }
};
// In +page.server.js
export async function load() {
  const messages = await db.getRecentMessages();
  return { messages, seq: await replay.seq('chat') };
}
// In hooks.ws.js - handle replay requests
export async function message(ws, { data, platform }) {
  const msg = JSON.parse(Buffer.from(data).toString());
  if (msg.type === 'replay') {
    await replay.replay(ws, msg.topic, msg.since, platform);
    return;
  }
}

Options

OptionDefaultDescription
size1000Max messages per topic
ttl0Key expiry in seconds (0 = never)
storage'sortedset'Backend: 'sortedset' (default Redis sorted-set) or 'stream' (Redis Streams).
durability'best-effort''best-effort' (default) or 'replicated'. Replicated mode runs WAIT minReplicas replicationTimeoutMs; throws ReplicationTimeoutError on under-acked timeouts.
localFanoutOnStorageFailurefalseBest-effort platform.publish fallback on storage failure. New counter: replay_storage_fallbacks_total{topic}. publishIdempotent always throws regardless.
idempotencyTtl48 * 3600TTL in seconds for publishIdempotent dedup state (stream backend only).

Storage backends

storage: 'stream' switches the backend to Redis Streams (XADD / XRANGE with <seq>-0 IDs). Same external contract - the same publish / seq / gap / since / replay / clear methods, same durability: 'replicated' mode, same metrics. Two changes under the hood:

  • Listpack encoding is more compact than sorted-set encoding for typical message shapes - meaningfully smaller memory for buffers in the thousands of entries per topic. The stream backend works on Redis 5+; listpack encoding is the Redis 7+ default that delivers the memory win.
  • Native seq-filtered scans. Stream IDs are <seq>-0 where seq is the same INCR counter the sorted-set backend uses. XRANGE against (seq-0 filters natively by sequence number, so range queries skip the app-side scan the sorted-set backend does for some paths.

Both backends share the same seq-counter key ({prefix}replay:seq:{topic}) but use different buf-key prefixes (replay:buf:{topic} for sorted-set, replay:streambuf:{topic} for stream), so they can coexist on the same Redis without WRONGTYPE collisions. A single topic should pick one backend at startup and stay there - there is no built-in migration helper for switching an existing topic mid-flight.

Idempotent publish (stream backend only)

For producers that need at-most-once semantics under retry, the stream backend exposes publishIdempotent:

const replay = createReplay(redis, { storage: 'stream' });

const { seq, isDuplicate } = await replay.publishIdempotent(platform, 'orders', 'created', order, {
  producerId: 'order-service',
  requestId: order.clientOrderId   // stable per-operation id supplied by the caller
});

On a fresh (producerId, requestId) tuple, the call performs the same INCR + XADD + broadcast as publish() and stashes seq in a per-(producer, topic) dedup hash. On a repeat tuple within idempotencyTtl (default 48 hours), the call returns the cached seq, skips the XADD, and skips the local broadcast - the original publish already broadcast to live consumers, and reconnecting consumers pick the entry up via replay() from the buffer.

The seq counter only advances on fresh writes, so duplicate retries do not introduce gaps that would trigger false-positive truncation events on consumers.

The dedup cache key is {prefix}replay:idmp:{producerId}:{topic} - topic-scoped, so the same requestId can be reused across topics without collision. Override the TTL per call via opts.idempotencyTtl, or globally via idempotencyTtl on createReplay.

Pairs naturally with the durable task runner: a task that publishes to the replay buffer can pass its task id as requestId so worker-crash retries don’t double-publish.

Replicated durability

For loss-sensitive flows (audit logs, financial events) opt in with durability: 'replicated'. After the write to the master, publish() runs WAIT minReplicas replicationTimeoutMs. If fewer than minReplicas replicas ack within the timeout, publish() throws ReplicationTimeoutError and skips the local broadcast - the data is on the master only, and broadcasting would commit live consumers to state that could be lost if the master fails before replicas catch up.

import { createReplay, ReplicationTimeoutError } from 'svelte-adapter-uws-extensions/redis/replay';

const replay = createReplay(redis, {
  durability: 'replicated',
  minReplicas: 1,
  replicationTimeoutMs: 1000
});

try {
  await replay.publish(platform, 'orders', 'created', order);
} catch (err) {
  if (err instanceof ReplicationTimeoutError) {
    // err.ack, err.minReplicas, err.timeoutMs available for logging
    // Caller decides: retry, fail the request, or accept best-effort
  }
  throw err;
}

The data is in the buffer on the master regardless of the WAIT outcome - other instances doing replay() will still see it. Only the local broadcast is suppressed when the durability signal fails. WAIT command-level errors (network / protocol) bubble up as the original error and DO count as a circuit-breaker failure; an under-acked timeout is a separate signal layer and does NOT trip the breaker.

Combine with localFanoutOnStorageFailure: true for a live-only fallback when storage itself fails - publish() won’t durably store, but subscribers still receive the message.

OptionDefaultDescription
minReplicas1Minimum replicas that must ack. Only with durability: 'replicated'.
replicationTimeoutMs1000Per-publish replication timeout in ms. 0 blocks indefinitely (Redis WAIT semantics).

API

All methods are async (they hit Redis). The API otherwise matches the core plugin:

MethodDescription
publish(platform, topic, event, data)Store + broadcast. Throws ReplayStorageError on storage failure (original on .cause).
publishIdempotent(platform, topic, event, data, { producerId, requestId })Stream backend only. Exactly-once production under retry.
seq(topic)Current sequence number.
since(topic, seq)Messages after a sequence.
gap(topic, lastSeenSeq)Returns { truncated, missingFrom }. lastSeenSeq=0 short-circuits.
replay(ws, topic, sinceSeq, platform, reqId?)Send missed messages to one client. End event carries { reqId }.
resumeHook()Returns a resume hook that wires lastSeenSeqs to gap-fill. Mount as export const resume = replay.resumeHook();.
clear()Delete all replay data.
clearTopic(topic)Delete replay data for one topic.

Errors

import { ReplayStorageError, ReplicationTimeoutError } from 'svelte-adapter-uws-extensions/redis/replay';

try {
  await replay.publish(platform, topic, event, data);
} catch (err) {
  if (err instanceof ReplayStorageError) {
    const root = err.cause; // original ioredis error
  }
  if (err instanceof ReplicationTimeoutError) {
    // durability: 'replicated' under-ack
  }
  throw err;
}

replay() consults platform.checkSubscribe(ws, topic) before reading the buffer. Topics the wire-subscribe gate would deny emit a denied event on __replay:{topic} instead of a buffer dump. Older adapters lacking checkSubscribe degrade gracefully.

Resume hook

resumeHook() wires reconnect-driven gap-fill into the adapter’s resume hook:

// hooks.ws.js
export const resume = replay.resumeHook();

Iterates lastSeenSeqs, gap-fills via existing replay(). Non-numeric / negative sinceSeq coerced to 0.

Was this page helpful?