From 4d5e23cad2f4c0e8f9c0366ba250bca233e800dd Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Mon, 26 Jan 2026 23:39:09 +0000 Subject: [PATCH] sync with latest --- .../redis-sync/collab-proxy-socket.ts | 8 +- .../redis-sync/redis-sync.extension.ts | 128 ++++++++---------- .../extensions/redis-sync/redis-sync.types.ts | 2 +- 3 files changed, 60 insertions(+), 78 deletions(-) diff --git a/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts b/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts index 32787f0a..8ecfa00a 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts @@ -9,7 +9,7 @@ import type { export class CollabProxySocket extends EventEmitter { private readonly replyTo: string; - private readonly pongChannel: string; + private readonly serverChannel: string; private readonly socketId: string; private pub: RedisClient; private readonly pack: Pack; @@ -19,13 +19,13 @@ export class CollabProxySocket extends EventEmitter { pub: RedisClient, pack: Pack, replyTo: string, - pongChannel: string, + serverChannel: string, socketId: string, ) { super(); this.replyTo = replyTo; - this.pongChannel = pongChannel; this.socketId = socketId; + this.serverChannel = serverChannel; this.pub = pub; this.pack = pack; this.once('close', () => { @@ -53,7 +53,7 @@ export class CollabProxySocket extends EventEmitter { const msg: RSAMessagePing = { type: 'ping', socketId: this.socketId, - respondTo: this.pongChannel, + replyTo: this.serverChannel, }; this.publish(msg); } diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts index 37f50e3e..5b78b9c0 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts @@ -1,18 +1,15 @@ // Source https://github.com/ueberdosis/hocuspocus/pull/1008 - MIT -import { IncomingMessage } from 'node:http'; import { Extension, Hocuspocus, - IncomingMessage as SocketIncomingMessage, + IncomingMessage, afterUnloadDocumentPayload, onConfigurePayload, onLoadDocumentPayload, } from '@hocuspocus/server'; import RedisClient from 'ioredis'; import { readVarString } from 'lib0/decoding.js'; -import { WebSocket } from 'ws'; import { CollabProxySocket } from './collab-proxy-socket'; -import { Logger } from '@nestjs/common'; import { BaseWebSocket, Configuration, @@ -36,7 +33,6 @@ type DocumentName = string; type SocketId = string; export class RedisSyncExtension implements Extension { - private readonly logger = new Logger('Collab' + RedisSyncExtension.name); priority = 1000; private readonly pub: RedisClient; private sub: RedisClient; @@ -45,25 +41,20 @@ export class RedisSyncExtension implements Extension { private originSockets: Record = {}; private locks: Record = {}; private lockPromises: Record> = {}; - private proxySockets: Record< - SocketId, - { socket: CollabProxySocket; cleanup: NodeJS.Timeout } - > = {}; + private proxySockets: Record = {}; private readonly prefix: string; private readonly lockPrefix: string; private readonly msgChannel: string; private readonly serverId: ServerId; private readonly customEventTTL: number; private readonly lockTTL: number; - private readonly proxySocketTTL: number; private instance!: Hocuspocus; private readonly customEvents: TCE; - private replyIdCounter = 0; - private pendingReplies: Record< - number, - // @ts-ignore - PromiseWithResolvers['resolve'] - > = {}; + private replyIdCounter: number = 0; + // @ts-ignore + private pendingReplies: Record['resolve']> = + {}; + constructor(configuration: Configuration) { const { redis, @@ -72,7 +63,6 @@ export class RedisSyncExtension implements Extension { serverId, lockTTL, prefix, - proxySocketTTL, customEvents, customEventTTL, } = configuration; @@ -82,12 +72,11 @@ export class RedisSyncExtension implements Extension { this.unpack = unpack; this.serverId = serverId; this.lockTTL = lockTTL ?? 10_000; - this.proxySocketTTL = proxySocketTTL ?? 30_000; this.customEventTTL = customEventTTL ?? 30_000; this.prefix = prefix ?? 'collab'; this.lockPrefix = `${this.prefix}Lock`; this.msgChannel = `${this.prefix}Msg`; - this.customEvents = (customEvents ?? {}) as unknown as TCE; + this.customEvents = (customEvents as any) ?? ({} as any as CustomEvents); this.sub.subscribe(this.msgChannel, `${this.msgChannel}:${this.serverId}`); this.sub.on('messageBuffer', this.handleRedisMessage); } @@ -96,18 +85,19 @@ export class RedisSyncExtension implements Extension { } private closeProxy(socketId: string) { - const socketRecord = this.proxySockets[socketId]; - if (!socketRecord) return; - clearTimeout(socketRecord.cleanup); - socketRecord.socket.emit('close', 1000, 'proxy_cleanup'); - delete this.proxySockets[socketId]; + const proxySocket = this.proxySockets[socketId]; + if (proxySocket) { + proxySocket.emit( + 'close', + 1000, + Buffer.from('provider_initiated', 'utf-8'), + ); + delete this.proxySockets[socketId]; + } } - private emitPong(socketId: string) { - const socketRecord = this.proxySockets[socketId]; - if (socketRecord) { - socketRecord.socket.emit('pong'); - } + private pongProxy(socketId: string) { + this.proxySockets[socketId]?.emit('pong'); } private handleProxyMessage( @@ -115,35 +105,24 @@ export class RedisSyncExtension implements Extension { ) { const { replyTo, message, serializedHTTPRequest } = msg; const { headers } = serializedHTTPRequest; - const socketId = headers['sec-websocket-key']; - let socketRecord = this.proxySockets[socketId]; - const cleanup = setTimeout(() => { - const record = this.proxySockets[socketId]; - if (record) { - record.socket.emit('close', 1000, 'ttl_expired'); - delete this.proxySockets[socketId]; - } - }, this.proxySocketTTL); - if (!socketRecord) { - const socket = new CollabProxySocket( + const socketId = headers['sec-websocket-key']!; + let socket = this.proxySockets[socketId]; + if (!socket) { + socket = new CollabProxySocket( this.pub, this.pack, replyTo, `${this.msgChannel}:${this.serverId}`, socketId, ); - socketRecord = { socket, cleanup }; - this.proxySockets[socketId] = socketRecord; + this.proxySockets[socketId] = socket; this.instance.handleConnection( - socket as unknown as WebSocket, - serializedHTTPRequest as unknown as IncomingMessage, + socket as any, + serializedHTTPRequest as any, {}, ); - } else { - clearTimeout(socketRecord.cleanup); - socketRecord.cleanup = cleanup; } - socketRecord.socket.emit('message', message); + socket.emit('message', message); } private getOrClaimLock(documentName: string) { @@ -185,6 +164,10 @@ export class RedisSyncExtension implements Extension { this.closeProxy(msg.socketId); return; } + if (type === 'pong') { + this.pongProxy(msg.socketId); + return; + } if (type === 'unload') { delete this.lockPromises[msg.documentName]; return; @@ -201,7 +184,7 @@ export class RedisSyncExtension implements Extension { replyId, payload: res, }; - this.pub.publish(`${replyTo}`, this.pack(reply)).catch(() => {}); + this.pub.publish(`${replyTo}`, this.pack(reply)); return; } if (type === 'customEventComplete') { @@ -212,10 +195,6 @@ export class RedisSyncExtension implements Extension { resolveFn(payload); return; } - if (type === 'pong') { - this.emitPong(msg.socketId); - return; - } const { socketId } = msg; const socket = this.originSockets[socketId]; if (!socket) { @@ -225,9 +204,14 @@ export class RedisSyncExtension implements Extension { if (type === 'close') { socket.close(msg.code, msg.reason); } else if (type === 'ping') { - const { respondTo } = msg; - const pong: RSAMessagePong = { type: 'pong', socketId }; - this.pub.publish(respondTo, this.pack(pong)).catch(() => {}); + // Reply instantly to the proxy socket, without forwarding to client + // The origin socket handles heartbeat for itself + const { replyTo, socketId } = msg; + const reply: RSAMessagePong = { + type: 'pong', + socketId, + }; + this.pub.publish(`${replyTo}`, this.pack(reply)); } else if (type === 'send') { socket.send(msg.message); } @@ -253,7 +237,7 @@ export class RedisSyncExtension implements Extension { private async handleEventLocally>( eventName: TName, documentName: string, - payload: unknown, + payload: any, ) { const handler = this.customEvents[eventName]; if (!handler) throw new Error(`Invalid eventName: ${eventName}`); @@ -264,7 +248,7 @@ export class RedisSyncExtension implements Extension { async handleEvent>( eventName: TName, documentName: string, - payload: unknown, + payload: any, ) { const isDocLoadedOnInstance = this.instance.documents.has(documentName); @@ -286,17 +270,13 @@ export class RedisSyncExtension implements Extension { type: 'customEventStart', }; const msg = this.pack(proxyMessage); - this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg).catch(() => {}); + this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg); // @ts-ignore const { promise, resolve, reject } = Promise.withResolvers(); - const timeoutId = setTimeout(() => { - delete this.pendingReplies[replyId]; + this.pendingReplies[replyId] = resolve; + setTimeout(() => { reject('TIMEOUT'); }, this.customEventTTL); - this.pendingReplies[replyId] = (result: unknown) => { - clearTimeout(timeoutId); - resolve(result); - }; return promise as Promise>; } // This server owns the document, but hocuspocus hasn't loaded it yet @@ -318,11 +298,11 @@ export class RedisSyncExtension implements Extension { serializedHTTPRequest: SerializedHTTPRequest, context = {}, ) { - const socketId = serializedHTTPRequest.headers['sec-websocket-key']; + const socketId = serializedHTTPRequest.headers['sec-websocket-key']!; this.originSockets[socketId] = ws; this.instance.handleConnection( - ws as unknown as WebSocket, - serializedHTTPRequest as unknown as IncomingMessage, + ws as any, + serializedHTTPRequest as any, context, ); } @@ -332,9 +312,8 @@ export class RedisSyncExtension implements Extension { serializedHTTPRequest: SerializedHTTPRequest, detachableMsg: ArrayBuffer, ) { - // @ts-ignore const message = new Uint8Array(detachableMsg.slice()); - const tmpMsg = new SocketIncomingMessage(detachableMsg); + const tmpMsg = new IncomingMessage(detachableMsg); const documentName = readVarString(tmpMsg.decoder); const isDocLoadedOnInstance = this.instance.documents.has(documentName); @@ -353,7 +332,7 @@ export class RedisSyncExtension implements Extension { type: 'proxy', }; const msg = this.pack(proxyMessage); - this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg).catch(() => {}); + this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg); return; } // This server owns the document, but hocuspocus hasn't loaded it yet @@ -363,8 +342,10 @@ export class RedisSyncExtension implements Extension { onSocketClose(socketId: string, code?: number, reason?: ArrayBuffer) { const socket = this.originSockets[socketId]; if (!socket) return; + // at this point the socket is considered GC'd and we cannot call close + // The origin socket did not set up any connections for the proxy, so none of the hooks will work if we just emit + socket?.emit('close', code, reason); delete this.originSockets[socketId]; - socket.emit('close', code, reason); const msg: RSAMessageCloseProxy = { type: 'closeProxy', socketId }; this.pub.publish(this.msgChannel, this.pack(msg)).catch(() => {}); } @@ -385,8 +366,9 @@ export class RedisSyncExtension implements Extension { this.releaseLock(documentName); // Broadcast to cluster to immediately remove the cached redis value const msg: RSAMessageUnload = { type: 'unload', documentName }; - this.pub.publish(this.msgChannel, this.pack(msg)).catch(() => {}); + this.pub.publish(this.msgChannel, this.pack(msg)); } + async onDestroy() { this.pub.disconnect(false); this.sub.disconnect(false); diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts index 7762fc0f..dd92cacd 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts @@ -45,7 +45,7 @@ export type RSAMessageClose = { export type RSAMessagePing = { type: 'ping'; socketId: string; - respondTo: string; + replyTo: string; }; export type RSAMessagePong = {