PubSub class

Topic-based messaging primitive. Powers chat AND general realtime events (cursor positions, custom app events, state sync, polls, etc). Access via room.pubsub.

Three methods. Topic is just a string. App-defined payloads. Server persists by default. See CHAT_DECISIONS_V1.md for design rationale.

Blueprint

Full surface at a glance.

class PubSub { // === Publish === // Send a message to a topic. Resolves with the server-confirmed message (id, timestamp, payload). publish<T>(topic: string, payload: T, opts?: PublishOpts): Promise<PubSubMessage<T>>; // === Subscribe === // Register a handler for live messages on a topic. Returns a PubSubSubscription handle. // Each call is independent โ€” SDK refcounts the wire subscription internally. subscribe<T>(topic: string, handler: (msg: PubSubMessage<T>) => void, opts?: SubscribeOpts): Promise<PubSubSubscription<T>>; // === History === // Fetch historic messages from a topic. Pull-based, paginated. getHistory<T>(topic: string, opts?: HistoryOpts): Promise<HistoryPage<T>>; }
Topics are implicit. No "create topic" step. First publish creates the topic server-side. Subscribing to a topic that doesn't exist yet is valid โ€” the handler waits idle until messages arrive. getHistory on an empty topic returns empty results, not an error.
Subscribe does NOT echo own messages. When the sender publishes, their own subscribe handler does not fire for that message. The sender renders their own message via the publish return value. subscribe handlers fire only for messages from other senders.

Methods

publish async

Sends a message to a topic. Resolves with the server-confirmed PubSubMessage โ€” the same shape subscribers receive.

publish<T>( topic: string, payload: T, opts?: PublishOpts, ): Promise<PubSubMessage<T>>
Parameters
  • topic โ€” string. 1โ€“128 chars, characters from [a-zA-Z0-9_-:.]. See topic naming rules.
  • payload โ€” app-defined payload, JSON-serializable, < 32 KiB after encoding.
  • opts โ€” optional PublishOpts (persist, to, metadata).
Returns

Resolves with the server-confirmed PubSubMessage<T> โ€” includes server-assigned id (snowflake) and timestamp.

Example โ€” simple chat message
const msg = await room.pubsub.publish('chat', { text: 'hello' });
console.log(msg.id, msg.timestamp);
// Render in your own UI โ€” subscribe does NOT echo own messages
Example โ€” ephemeral cursor position (no history)
await room.pubsub.publish('cursor', { x: 120, y: 340 }, { persist: false });
// Not stored in history; subscribers receive it but getHistory won't return it later
Example โ€” targeted message (DM)
await room.pubsub.publish('chat', { text: 'private hi' }, {
  to: ['alice'],   // only Alice's subscribe handlers fire
});
Example โ€” metadata for app-level routing
await room.pubsub.publish('events', { action: 'click' }, {
  metadata: { priority: 'low', source: 'main-app' },
});

// Subscriber side
sub.handler = (msg) => {
  if (msg.metadata?.priority === 'high') showToast(msg);
};
Example โ€” optimistic UI (app pattern)
async function sendChat(text) {
  const uiKey = appendToUI({ text, status: 'pending' });
  try {
    const real = await room.pubsub.publish('chat', { text });
    upgradeUI(uiKey, real);
  } catch (err) {
    markFailedUI(uiKey, err);
  }
}

Errors (rejects with)

CodeOriginCauseRetriable
INVALID_TOPIC_NAMESDKTopic violates naming rulesNo
PAYLOAD_TOO_LARGESDKPayload > 32 KiB after JSON encodingNo
INVALID_PAYLOADSDKPayload not JSON-serializableNo
NOT_CONNECTEDSDKPublish before join / after leaveNo
RATE_LIMIT_EXCEEDEDServerPer-topic rate cap exceededYes
PERMISSION_DENIEDServerTopic restricted for this clientNo
NETWORK_ERRORSDK/ServerTransient; SDK auto-retries 3ร— before rejectingYes
Retry is automatic for transient errors. SDK retries up to 3 times with exponential backoff (100ms / 500ms / 2s) on NETWORK_ERROR. Idempotency is handled internally โ€” no duplicate messages on retry. App sees one Promise resolve / reject.

subscribe async

Registers a handler for live messages on a topic. Returns a PubSubSubscription handle with .unsubscribe().

Each subscribe call is independent. Multiple calls on the same topic create multiple registrations; SDK refcounts the wire subscription internally (only one request to the server per topic per client).

subscribe<T>( topic: string, handler: (msg: PubSubMessage<T>) => void, opts?: SubscribeOpts, ): Promise<PubSubSubscription<T>>
Parameters
  • topic โ€” string topic name.
  • handler โ€” called with each incoming PubSubMessage. Does NOT fire for the local client's own published messages.
  • opts โ€” optional SubscribeOpts. Currently: limit (rate cap, msg/sec).
Returns

Resolves with a PubSubSubscription<T> handle. Includes topic and unsubscribe().

Example โ€” basic subscribe
const sub = await room.pubsub.subscribe('chat', (msg) => {
  console.log(msg.from, msg.payload, msg.timestamp);
});

// Later
await sub.unsubscribe();
Example โ€” type-safe payload
interface ChatPayload { text: string; }

const sub = await room.pubsub.subscribe<ChatPayload>('chat', (msg) => {
  msg.payload.text;   // โ† typed
});
Example โ€” history + live, fetched in parallel
// subscribe (live) and getHistory (backlog) are independent โ€” run them together
const [sub, page] = await Promise.all([
  room.pubsub.subscribe('chat', appendToView),
  room.pubsub.getHistory('chat'),
]);

prependToView(page.messages);   // backlog โ€” [] if the topic is empty, harmless
// sub now delivers every new message via appendToView
Example โ€” rate cap (1000-person room)
// Cap to 30 msg/sec for human-readable UI
const sub = await room.pubsub.subscribe('chat', renderMessage, { limit: 30 });
// SDK drops oldest messages in window when burst exceeds 30/sec
Example โ€” cross-file subscriptions (each owns its own)
// PageA.tsx
const subA = await room.pubsub.subscribe('chat', renderMessage);
// On unmount: subA.unsubscribe()

// PageB.tsx (different file โ€” doesn't need subA)
const subB = await room.pubsub.subscribe('chat', notifyOnMention);
// On unmount: subB.unsubscribe()

// SDK: refcount = 2 โ†’ 1 wire subscription. Both handlers fire per message.
Example โ€” React useEffect cleanup
useEffect(() => {
  let sub: PubSubSubscription | null = null;

  (async () => {
    sub = await room.pubsub.subscribe('chat', handler);
  })();

  return () => { sub?.unsubscribe(); };   // idempotent โ€” safe in strict mode
}, []);

Behavior details

ScenarioBehavior
Same handler subscribed twiceTwo independent subs; handler fires twice per message
Subscribe to never-used topicOK โ€” handler waits idle until first publish
Sender's own published messagesDo NOT fire on sender's subscribe handler (only on other peers')
sub.unsubscribe() twice on same handleIdempotent โ€” second call is a no-op, resolves immediately
Multiple subs with different limit valuesServer respects max(limits); SDK filters per-sub locally
One sub has no limitServer sends full firehose for that topic; other subs filter locally
Reconnect mid-subscriptionSDK re-subscribes automatically on new connection; handler keeps firing

getHistory async

Fetches historic messages from a topic. Pull-based, paginated. Never replays as events on subscribe โ€” explicit fetch keeps wire traffic predictable.

getHistory<T>( topic: string, opts?: HistoryOpts, ): Promise<HistoryPage<T>>
Parameters
  • topic โ€” string topic name.
  • opts โ€” optional HistoryOpts: limit (page size, default 25, max 50), before / after (a message id โ€” the direction anchor), from (server-side sender filter).
Returns

Resolves with a HistoryPage<T> โ€” { messages, hasMore }. messages is oldest-first; hasMore indicates more exist in the direction queried.

Direction

CallReturns
getHistory('chat')The newest page.
getHistory('chat', { before: id })The page of messages just older than id โ€” scroll-up.
getHistory('chat', { after: id })The page of messages just newer than id โ€” reconnect gap-fill.

Direction is the parameter name โ€” there is no direction enum. The anchor is a message id (not an opaque cursor), so it works for any message the client holds. Pass before or after, never both.

Example โ€” initial load (newest page)
const page = await room.pubsub.getHistory('chat');
render(page.messages);                       // oldest-first โ€” render top to bottom
// page.messages === [] on an empty topic โ€” no error
Example โ€” scroll up / load older
let page = await room.pubsub.getHistory('chat');
render(page.messages);

async function loadOlder() {
  if (!page.hasMore) return;                 // no older history
  page = await room.pubsub.getHistory('chat', {
    before: page.messages[0].id,             // oldest id in the current page
  });
  prepend(page.messages);
}
Example โ€” reconnect gap-fill (load newer)
// While connected, capture the id of the last live message
let lastSeenId = null;
await room.pubsub.subscribe('chat', (msg) => {
  append(msg);
  lastSeenId = msg.id;                       // every message carries its own id
});

// โ”€โ”€ after a disconnect, then reconnect โ”€โ”€

// Fetch everything published during the outage
let page = await room.pubsub.getHistory('chat', { after: lastSeenId });
append(page.messages);

while (page.hasMore) {                       // more newer messages to catch up on
  page = await room.pubsub.getHistory('chat', {
    after: page.messages.at(-1).id,          // newest id in the current page
  });
  append(page.messages);
}
Example โ€” filter by sender (server-side)
// Only messages from Alice or Bob โ€” filtered server-side, then paged
let page = await room.pubsub.getHistory('chat', { from: ['alice', 'bob'] });
render(page.messages);

if (page.hasMore) {                          // hasMore reflects the FILTERED set
  page = await room.pubsub.getHistory('chat', {
    from:   ['alice', 'bob'],
    before: page.messages[0].id,
  });
}
Pagination has no cursor and no Paginator object. The message id already in the page is the anchor โ€” re-call getHistory with messages[0].id (older) or the last message's id (newer). Every call is independent. See CHAT_DECISIONS_V1.md Decision 22.

Errors (rejects with)

CodeOriginCauseRetriable
INVALID_TOPIC_NAMESDKTopic violates naming rulesNo
INVALID_LIMITSDKlimit > 50 or ≤ 0No
INVALID_HISTORY_OPTSSDKBoth before and after setNo
NOT_CONNECTEDSDKCalled before join / after leaveNo
NETWORK_ERRORSDK/ServerTransient; SDK auto-retries before rejectingYes

Topic naming rules

RuleValue
Allowed characters[a-zA-Z0-9_-:.] โ€” alphanumeric, underscore, hyphen, colon, period
Min length1 character (non-empty)
Max length128 characters
Reserved prefixNone โ€” apps may use any prefix including __
Reserved namesNone
Case-sensitiveYes โ€” chat and Chat are different topics
Valid vs invalid
'chat'             // โœ…
'dm:alice:bob'     // โœ… colons for namespacing
'mods-only'        // โœ… hyphen
'app.events.v2'    // โœ… periods
'__system'         // โœ… no reserved prefix
'chat room'        // โŒ INVALID_TOPIC_NAME โ€” space
''                 // โŒ INVALID_TOPIC_NAME โ€” empty
'a'.repeat(200)    // โŒ INVALID_TOPIC_NAME โ€” too long

Design rationale

For the full design history and alternatives considered, see CHAT_DECISIONS_V1.md and per-SDK studies in chat-doc/.

Why one API instead of separate room.chat + room.pubsub: messaging is topics. Chat is a use case (publish to a topic with chat-shaped payload), not a separate product. One mental model, one verb (publish), no API duplication.

Why no first-class reactions/typing/threads in v1: apps build these on top of publish using their own topic schemas. First-class APIs ship in v1.x once usage patterns are validated.

Open questions

Items still being designed:

See also: Room.pubsub PubSubMessage PublishOpts PubSubSubscription CHAT_DECISIONS_V1.md