diff --git a/apps/server/package.json b/apps/server/package.json index 71e68679..edecf07a 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -76,8 +76,10 @@ "kysely-migration-cli": "^0.4.2", "kysely-postgres-js": "^3.0.0", "ldapts": "^7.4.0", + "lib0": "^0.2.117", "mammoth": "^1.11.0", "mime-types": "^2.1.35", + "msgpackr": "^1.11.8", "nanoid": "3.3.11", "nestjs-kysely": "^1.2.0", "nestjs-pino": "^4.5.0", @@ -102,6 +104,7 @@ "socket.io": "^4.8.3", "stripe": "^17.5.0", "tmp-promise": "^3.0.3", + "tseep": "^1.3.1", "typesense": "^2.1.0", "ws": "^8.19.0", "yauzl": "^3.2.0" diff --git a/apps/server/src/collaboration/adapter/collab-ws.adapter.ts b/apps/server/src/collaboration/adapter/collab-ws.adapter.ts index 352fe01f..18685bf0 100644 --- a/apps/server/src/collaboration/adapter/collab-ws.adapter.ts +++ b/apps/server/src/collaboration/adapter/collab-ws.adapter.ts @@ -30,14 +30,22 @@ export class CollabWsAdapter { return this.wss; } - public destroy() { + public close() { try { - this.wss.clients.forEach((client) => { - client.terminate(); - }); this.wss.close(); } catch (err) { console.error(err); } } + + public destroy() { + try { + this.wss.close(); + this.wss.clients.forEach((client) => { + client.terminate(); + }); + } catch (err) { + console.error(err); + } + } } diff --git a/apps/server/src/collaboration/collaboration.gateway.ts b/apps/server/src/collaboration/collaboration.gateway.ts index f1d50671..b296c520 100644 --- a/apps/server/src/collaboration/collaboration.gateway.ts +++ b/apps/server/src/collaboration/collaboration.gateway.ts @@ -1,10 +1,9 @@ -import { Hocuspocus, Server as HocuspocusServer } from '@hocuspocus/server'; +import { Hocuspocus } from '@hocuspocus/server'; import { IncomingMessage } from 'http'; import WebSocket from 'ws'; import { AuthenticationExtension } from './extensions/authentication.extension'; import { PersistenceExtension } from './extensions/persistence.extension'; import { Injectable } from '@nestjs/common'; -import { Redis } from '@hocuspocus/extension-redis'; import { EnvironmentService } from '../integrations/environment/environment.service'; import { createRetryStrategy, @@ -12,19 +11,39 @@ import { RedisConfig, } from '../common/helpers'; import { LoggerExtension } from './extensions/logger.extension'; +import { + RedisSyncExtension, + SerializedHTTPRequest, +} from './extensions/redis-sync'; +import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper'; +import RedisClient from 'ioredis'; +import { pack, unpack } from 'msgpackr'; +import { nanoid } from 'nanoid'; +import * as os from 'node:os'; +import { CollabWsAdapter } from './adapter/collab-ws.adapter'; +import { + CollaborationHandler, + CollabEventHandlers, +} from './collaboration.handler'; @Injectable() export class CollaborationGateway { - private hocuspocus: Hocuspocus; + private readonly hocuspocus: Hocuspocus; private redisConfig: RedisConfig; + // @ts-ignore + private readonly redisSync: RedisSyncExtension | null = + null; + private readonly withRedis: boolean; constructor( private authenticationExtension: AuthenticationExtension, private persistenceExtension: PersistenceExtension, private loggerExtension: LoggerExtension, private environmentService: EnvironmentService, + private collabEventsService: CollaborationHandler, ) { this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl()); + this.withRedis = !this.environmentService.isCollabDisableRedis(); this.hocuspocus = new Hocuspocus({ debounce: 10000, @@ -34,26 +53,80 @@ export class CollaborationGateway { this.authenticationExtension, this.persistenceExtension, this.loggerExtension, - ...(this.environmentService.isCollabDisableRedis() - ? [] - : [ - new Redis({ - host: this.redisConfig.host, - port: this.redisConfig.port, - options: { - password: this.redisConfig.password, - db: this.redisConfig.db, - family: this.redisConfig.family, - retryStrategy: createRetryStrategy(), - }, - }), - ]), ], }); + + if (this.withRedis) { + // @ts-ignore + this.redisSync = new RedisSyncExtension({ + redis: new RedisClient({ + host: this.redisConfig.host, + port: this.redisConfig.port, + password: this.redisConfig.password, + db: this.redisConfig.db, + family: this.redisConfig.family, + retryStrategy: createRetryStrategy(), + }), + serverId: `collab-${os?.hostname()}-${nanoid(10)}`, + prefix: 'collab', + pack, + unpack, + // @ts-ignore + customEvents: this.collabEventsService.getHandlers(this.hocuspocus), + }); + this.hocuspocus.configuration.extensions.push(this.redisSync); + // @ts-ignore + this.redisSync.onConfigure({ instance: this.hocuspocus }); + } + } + + private serializeRequest(request: IncomingMessage): SerializedHTTPRequest { + return { + method: request.method ?? 'GET', + url: request.url ?? '/', + headers: { + 'sec-websocket-key': request.headers['sec-websocket-key'] ?? '', + 'sec-websocket-protocol': + request.headers['sec-websocket-protocol'] ?? '', + }, + socket: { remoteAddress: request.socket?.remoteAddress ?? '' }, + }; } handleConnection(client: WebSocket, request: IncomingMessage): any { - this.hocuspocus.handleConnection(client, request); + if (this.redisSync) { + const serializedHTTPRequest = this.serializeRequest(request); + const socketId = serializedHTTPRequest.headers['sec-websocket-key']; + + // Create wrapper socket that only receives events via emit() + // This prevents double-handling since Hocuspocus won't listen to raw WebSocket events + const wrappedSocket = new WsSocketWrapper(client); + + // Route through RedisSync extension (this calls handleConnection internally) + this.redisSync.onSocketOpen(wrappedSocket as any, serializedHTTPRequest); + + // Forward raw WebSocket messages to the extension + client.on('message', (data: ArrayBuffer) => { + this.redisSync!.onSocketMessage( + wrappedSocket as any, + serializedHTTPRequest, + data, + ); + }); + + // Forward close events + client.on('close', (code: number, reason: Buffer) => { + this.redisSync!.onSocketClose(socketId, code, reason); + }); + + // Forward pong events for keepalive + client.on('pong', (data: Buffer) => { + wrappedSocket.emit('pong', data); + }); + } else { + // Fallback to direct Hocuspocus connection + this.hocuspocus.handleConnection(client, request); + } } getConnectionCount() { @@ -64,7 +137,52 @@ export class CollaborationGateway { return this.hocuspocus.getDocumentsCount(); } - async destroy(): Promise { - //await this.hocuspocus.destroy(); + handleYjsEvent( + eventName: TName, + documentName: string, + payload: Parameters[1], + ) { + return this.redisSync?.handleEvent(eventName, documentName, payload); + } + + openDirectConnection(documentName: string, context?: any) { + return this.hocuspocus.openDirectConnection(documentName, context); + } + + /* + *Can be used before calling openDirectConnection directly + */ + async lockDocument(documentName: string) { + return this.redisSync.lockDocument(documentName); + } + + /* + *Releases a document lock and stops the interval that maintains it. + */ + async releaseLock(documentName: string) { + return this.redisSync.releaseLock(documentName); + } + + async destroy(collabWsAdapter: CollabWsAdapter): Promise { + // eslint-disable-next-line no-async-promise-executor + await new Promise(async (resolve) => { + try { + // Wait for all documents to unload + this.hocuspocus.configuration.extensions.push({ + async afterUnloadDocument({ instance }) { + if (instance.getDocumentsCount() === 0) resolve(''); + }, + }); + + collabWsAdapter?.close(); + + if (this.hocuspocus.getDocumentsCount() === 0) resolve(''); + this.hocuspocus.closeConnections(); + } catch (error) { + console.error(error); + } + }); + + await this.hocuspocus.hooks('onDestroy', { instance: this.hocuspocus }); } } diff --git a/apps/server/src/collaboration/collaboration.handler.ts b/apps/server/src/collaboration/collaboration.handler.ts new file mode 100644 index 00000000..ec746550 --- /dev/null +++ b/apps/server/src/collaboration/collaboration.handler.ts @@ -0,0 +1,42 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Hocuspocus, Document } from '@hocuspocus/server'; + +export type CollabEventHandlers = ReturnType< + CollaborationHandler['getHandlers'] +>; + +@Injectable() +export class CollaborationHandler { + private readonly logger = new Logger(CollaborationHandler.name); + + constructor() {} + + getHandlers(hocuspocus: Hocuspocus) { + return { + alterState: async (documentName: string, payload: { pageId: string }) => { + // dummy + // this.logger.log('Processing', documentName, payload); + // await this.withYdocConnection(hocuspocus, documentName, {}, (doc) => { + // const fragment = doc.getXmlFragment('default'); + //}); + }, + }; + } + + async withYdocConnection( + hocuspocus: Hocuspocus, + documentName: string, + context: any = {}, + fn: (doc: Document) => void, + ): Promise { + const connection = await hocuspocus.openDirectConnection( + documentName, + context, + ); + try { + await connection.transact(fn); + } finally { + await connection.disconnect(); + } + } +} diff --git a/apps/server/src/collaboration/collaboration.module.ts b/apps/server/src/collaboration/collaboration.module.ts index 30cb0ccf..e9374c53 100644 --- a/apps/server/src/collaboration/collaboration.module.ts +++ b/apps/server/src/collaboration/collaboration.module.ts @@ -9,6 +9,7 @@ import { WebSocket } from 'ws'; import { TokenModule } from '../core/auth/token.module'; import { HistoryListener } from './listeners/history.listener'; import { LoggerExtension } from './extensions/logger.extension'; +import { CollaborationHandler } from './collaboration.handler'; @Module({ providers: [ @@ -17,6 +18,7 @@ import { LoggerExtension } from './extensions/logger.extension'; PersistenceExtension, LoggerExtension, HistoryListener, + CollaborationHandler, ], exports: [CollaborationGateway], imports: [TokenModule], @@ -46,16 +48,12 @@ export class CollaborationModule implements OnModuleInit, OnModuleDestroy { }); wss.on('error', (error) => - this.logger.log('WebSocket server error:', error), + this.logger.error('WebSocket server error:', error), ); } async onModuleDestroy(): Promise { - if (this.collaborationGateway) { - await this.collaborationGateway.destroy(); - } - if (this.collabWsAdapter) { - this.collabWsAdapter.destroy(); - } + await this.collaborationGateway?.destroy(this.collabWsAdapter); + this.collabWsAdapter?.destroy(); } } diff --git a/apps/server/src/collaboration/extensions/logger.extension.ts b/apps/server/src/collaboration/extensions/logger.extension.ts index 969fa712..bbca47bd 100644 --- a/apps/server/src/collaboration/extensions/logger.extension.ts +++ b/apps/server/src/collaboration/extensions/logger.extension.ts @@ -9,11 +9,11 @@ import { Injectable, Logger } from '@nestjs/common'; export class LoggerExtension implements Extension { private readonly logger = new Logger('Collab' + LoggerExtension.name); - async onDisconnect(data: onDisconnectPayload) { - this.logger.debug(`User disconnected from "${data.documentName}".`); - } - async afterUnloadDocument(data: onLoadDocumentPayload) { this.logger.debug('Unloaded ' + data.documentName + ' from memory'); } + + async onDisconnect(data: onDisconnectPayload) { + this.logger.debug('User disconnected from ' + data.documentName); + } } diff --git a/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts b/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts new file mode 100644 index 00000000..8ecfa00a --- /dev/null +++ b/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts @@ -0,0 +1,70 @@ +import type RedisClient from 'ioredis'; +import { EventEmitter } from 'tseep'; +import type { + Pack, + RSAMessageClose, + RSAMessagePing, + RSAMessageSend, +} from './redis-sync.types'; + +export class CollabProxySocket extends EventEmitter { + private readonly replyTo: string; + private readonly serverChannel: string; + private readonly socketId: string; + private pub: RedisClient; + private readonly pack: Pack; + readyState = 1; + + constructor( + pub: RedisClient, + pack: Pack, + replyTo: string, + serverChannel: string, + socketId: string, + ) { + super(); + this.replyTo = replyTo; + this.socketId = socketId; + this.serverChannel = serverChannel; + this.pub = pub; + this.pack = pack; + this.once('close', () => { + this.readyState = 3; + }); + } + + private publish(msg: RSAMessageClose | RSAMessagePing | RSAMessageSend) { + this.pub.publish(this.replyTo, this.pack(msg)); + } + + close(code?: number, reason?: string) { + if (this.readyState !== 1) return; + const msg: RSAMessageClose = { + type: 'close', + code, + reason, + socketId: this.socketId, + }; + this.publish(msg); + } + + ping() { + if (this.readyState !== 1) return; + const msg: RSAMessagePing = { + type: 'ping', + socketId: this.socketId, + replyTo: this.serverChannel, + }; + this.publish(msg); + } + + send(message: Uint8Array) { + if (this.readyState !== 1) return; + const msg: RSAMessageSend = { + type: 'send', + socketId: this.socketId, + message, + }; + this.publish(msg); + } +} diff --git a/apps/server/src/collaboration/extensions/redis-sync/index.ts b/apps/server/src/collaboration/extensions/redis-sync/index.ts new file mode 100644 index 00000000..a5847477 --- /dev/null +++ b/apps/server/src/collaboration/extensions/redis-sync/index.ts @@ -0,0 +1,2 @@ +export * from './redis-sync.extension'; +export type { SerializedHTTPRequest } from './redis-sync.extension'; diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts new file mode 100644 index 00000000..5b78b9c0 --- /dev/null +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts @@ -0,0 +1,376 @@ +// Source https://github.com/ueberdosis/hocuspocus/pull/1008 - MIT +import { + Extension, + Hocuspocus, + IncomingMessage, + afterUnloadDocumentPayload, + onConfigurePayload, + onLoadDocumentPayload, +} from '@hocuspocus/server'; +import RedisClient from 'ioredis'; +import { readVarString } from 'lib0/decoding.js'; +import { CollabProxySocket } from './collab-proxy-socket'; +import { + BaseWebSocket, + Configuration, + CustomEvents, + Pack, + RSAMessage, + RSAMessageCloseProxy, + RSAMessageCustomEventComplete, + RSAMessageCustomEventStart, + RSAMessagePong, + RSAMessageProxy, + RSAMessageUnload, + SerializedHTTPRequest, + Unpack, +} from './redis-sync.types'; + +export type { Pack, SerializedHTTPRequest } from './redis-sync.types'; + +type ServerId = string; +type DocumentName = string; +type SocketId = string; + +export class RedisSyncExtension implements Extension { + priority = 1000; + private readonly pub: RedisClient; + private sub: RedisClient; + private readonly pack: Pack; + private readonly unpack: Unpack; + private originSockets: Record = {}; + private locks: Record = {}; + private lockPromises: Record> = {}; + private proxySockets: Record = {}; + private readonly prefix: string; + private readonly lockPrefix: string; + private readonly msgChannel: string; + private readonly serverId: ServerId; + private readonly customEventTTL: number; + private readonly lockTTL: number; + private instance!: Hocuspocus; + private readonly customEvents: TCE; + private replyIdCounter: number = 0; + // @ts-ignore + private pendingReplies: Record['resolve']> = + {}; + + constructor(configuration: Configuration) { + const { + redis, + pack, + unpack, + serverId, + lockTTL, + prefix, + customEvents, + customEventTTL, + } = configuration; + this.pub = redis.duplicate(); + this.sub = redis.duplicate(); + this.pack = pack; + this.unpack = unpack; + this.serverId = serverId; + this.lockTTL = lockTTL ?? 10_000; + this.customEventTTL = customEventTTL ?? 30_000; + this.prefix = prefix ?? 'collab'; + this.lockPrefix = `${this.prefix}Lock`; + this.msgChannel = `${this.prefix}Msg`; + this.customEvents = (customEvents as any) ?? ({} as any as CustomEvents); + this.sub.subscribe(this.msgChannel, `${this.msgChannel}:${this.serverId}`); + this.sub.on('messageBuffer', this.handleRedisMessage); + } + private getKey(documentName: string) { + return `${this.lockPrefix}:${documentName}`; + } + + private closeProxy(socketId: string) { + const proxySocket = this.proxySockets[socketId]; + if (proxySocket) { + proxySocket.emit( + 'close', + 1000, + Buffer.from('provider_initiated', 'utf-8'), + ); + delete this.proxySockets[socketId]; + } + } + + private pongProxy(socketId: string) { + this.proxySockets[socketId]?.emit('pong'); + } + + private handleProxyMessage( + msg: Pick, + ) { + const { replyTo, message, serializedHTTPRequest } = msg; + const { headers } = serializedHTTPRequest; + const socketId = headers['sec-websocket-key']!; + let socket = this.proxySockets[socketId]; + if (!socket) { + socket = new CollabProxySocket( + this.pub, + this.pack, + replyTo, + `${this.msgChannel}:${this.serverId}`, + socketId, + ); + this.proxySockets[socketId] = socket; + this.instance.handleConnection( + socket as any, + serializedHTTPRequest as any, + {}, + ); + } + socket.emit('message', message); + } + + private getOrClaimLock(documentName: string) { + const lockPromise = this.pub.set( + this.getKey(documentName), + this.serverId, + 'PX', + this.lockTTL, + 'NX', + 'GET', + ); + this.lockPromises[documentName] = lockPromise; + // Briefly cache the serverId that claimed the doc to reduce load on redis + // When the claimant unloads the doc, it will send an unload message to immediately clear this + // a lockTTL / 2 guarantees stale reads < lockTTL upon server crash + setTimeout(() => { + delete this.lockPromises[documentName]; + }, this.lockTTL / 2); + return lockPromise; + } + + private getOrClaimLockThrottled(documentName: string) { + const existingWorkerIdPromise = this.lockPromises[documentName]; + if (existingWorkerIdPromise) return existingWorkerIdPromise; + return this.getOrClaimLock(documentName); + } + + private handleRedisMessage = async ( + _channel: Buffer, + packedMessage: Buffer, + ) => { + const msg = this.unpack(packedMessage) as RSAMessage; + const { type } = msg; + if (type === 'proxy') { + this.handleProxyMessage(msg); + return; + } + if (type === 'closeProxy') { + this.closeProxy(msg.socketId); + return; + } + if (type === 'pong') { + this.pongProxy(msg.socketId); + return; + } + if (type === 'unload') { + delete this.lockPromises[msg.documentName]; + return; + } + if (type === 'customEventStart') { + const { documentName, eventName, payload, replyTo, replyId } = msg; + const res = await this.handleEventLocally( + eventName as Extract, + documentName, + payload, + ); + const reply: RSAMessageCustomEventComplete = { + type: 'customEventComplete', + replyId, + payload: res, + }; + this.pub.publish(`${replyTo}`, this.pack(reply)); + return; + } + if (type === 'customEventComplete') { + const { replyId, payload } = msg; + const resolveFn = this.pendingReplies[replyId]; + if (!resolveFn) return; + delete this.pendingReplies[replyId]; + resolveFn(payload); + return; + } + const { socketId } = msg; + const socket = this.originSockets[socketId]; + if (!socket) { + // origin socket already cleaned up + return; + } + if (type === 'close') { + socket.close(msg.code, msg.reason); + } else if (type === 'ping') { + // Reply instantly to the proxy socket, without forwarding to client + // The origin socket handles heartbeat for itself + const { replyTo, socketId } = msg; + const reply: RSAMessagePong = { + type: 'pong', + socketId, + }; + this.pub.publish(`${replyTo}`, this.pack(reply)); + } else if (type === 'send') { + socket.send(msg.message); + } + }; + + async maintainLock(documentName: string) { + this.locks[documentName] = setInterval(() => { + this.pub.set( + this.getKey(documentName), + this.serverId, + 'PX', + this.lockTTL, + ); + }, this.lockTTL / 2); + } + + async releaseLock(documentName: string) { + clearInterval(this.locks[documentName]); + delete this.locks[documentName]; + return this.pub.del(this.getKey(documentName)); + } + + private async handleEventLocally>( + eventName: TName, + documentName: string, + payload: any, + ) { + const handler = this.customEvents[eventName]; + if (!handler) throw new Error(`Invalid eventName: ${eventName}`); + const result = await handler(documentName, payload); + return result as Promise>; + } + + async handleEvent>( + eventName: TName, + documentName: string, + payload: any, + ) { + const isDocLoadedOnInstance = this.instance.documents.has(documentName); + + if (isDocLoadedOnInstance) { + return this.handleEventLocally(eventName, documentName, payload); + } + + const proxyTo = await this.getOrClaimLockThrottled(documentName); + if (proxyTo && proxyTo !== this.serverId) { + ++this.replyIdCounter; // bug in biome thinks this.replyIdCounter is not used if written on the line below + const replyId = this.replyIdCounter; + // another server owns the doc + const proxyMessage: RSAMessageCustomEventStart = { + eventName, + documentName, + payload, + replyTo: `${this.msgChannel}:${this.serverId}`, + replyId, + type: 'customEventStart', + }; + const msg = this.pack(proxyMessage); + this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg); + // @ts-ignore + const { promise, resolve, reject } = Promise.withResolvers(); + this.pendingReplies[replyId] = resolve; + setTimeout(() => { + reject('TIMEOUT'); + }, this.customEventTTL); + return promise as Promise>; + } + // This server owns the document, but hocuspocus hasn't loaded it yet + return this.handleEventLocally(eventName, documentName, payload); + } + + async lockDocument(documentName: string) { + const proxyTo = await this.getOrClaimLockThrottled(documentName); + if (proxyTo && proxyTo !== this.serverId) { + throw new Error(`Could not lock document: ${documentName}`); + } + this.maintainLock(documentName); + return () => this.releaseLock(documentName); + } + + /* WebSocket Server Hooks */ + onSocketOpen( + ws: BaseWebSocket, + serializedHTTPRequest: SerializedHTTPRequest, + context = {}, + ) { + const socketId = serializedHTTPRequest.headers['sec-websocket-key']!; + this.originSockets[socketId] = ws; + this.instance.handleConnection( + ws as any, + serializedHTTPRequest as any, + context, + ); + } + + async onSocketMessage( + ws: BaseWebSocket, + serializedHTTPRequest: SerializedHTTPRequest, + detachableMsg: ArrayBuffer, + ) { + const message = new Uint8Array(detachableMsg.slice()); + const tmpMsg = new IncomingMessage(detachableMsg); + const documentName = readVarString(tmpMsg.decoder); + const isDocLoadedOnInstance = this.instance.documents.has(documentName); + + if (isDocLoadedOnInstance) { + ws.emit('message', message); + return; + } + + const proxyTo = await this.getOrClaimLockThrottled(documentName); + if (proxyTo && proxyTo !== this.serverId) { + // another server owns the doc + const proxyMessage: RSAMessageProxy = { + serializedHTTPRequest: serializedHTTPRequest, + replyTo: `${this.msgChannel}:${this.serverId}`, + message, + type: 'proxy', + }; + const msg = this.pack(proxyMessage); + this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg); + return; + } + // This server owns the document, but hocuspocus hasn't loaded it yet + ws.emit('message', message); + } + + onSocketClose(socketId: string, code?: number, reason?: ArrayBuffer) { + const socket = this.originSockets[socketId]; + if (!socket) return; + // at this point the socket is considered GC'd and we cannot call close + // The origin socket did not set up any connections for the proxy, so none of the hooks will work if we just emit + socket?.emit('close', code, reason); + delete this.originSockets[socketId]; + const msg: RSAMessageCloseProxy = { type: 'closeProxy', socketId }; + this.pub.publish(this.msgChannel, this.pack(msg)).catch(() => {}); + } + + /* Hocuspocus hooks */ + async onConfigure({ instance }: onConfigurePayload) { + this.instance = instance; + } + + async onLoadDocument(data: onLoadDocumentPayload) { + const { documentName } = data; + // Refresh the lock TTL + this.maintainLock(documentName); + } + + async afterUnloadDocument(data: afterUnloadDocumentPayload) { + const { documentName } = data; + this.releaseLock(documentName); + // Broadcast to cluster to immediately remove the cached redis value + const msg: RSAMessageUnload = { type: 'unload', documentName }; + this.pub.publish(this.msgChannel, this.pack(msg)); + } + + async onDestroy() { + this.pub.disconnect(false); + this.sub.disconnect(false); + } +} diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts new file mode 100644 index 00000000..1bbab80a --- /dev/null +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts @@ -0,0 +1,121 @@ +import EventEmitter from 'node:events'; +import { IncomingHttpHeaders } from 'node:http2'; +import RedisClient from 'ioredis'; + +export type SecondParam = T extends ( + arg1: unknown, + arg2: infer A, + ...args: unknown[] +) => unknown + ? A + : never; + +export type SerializedHTTPRequest = { + method: string; + url: string; + headers: IncomingHttpHeaders; + socket: { remoteAddress: string }; +}; + +export type RSAMessageProxy = { + type: 'proxy'; + replyTo: string; + message: Uint8Array; + serializedHTTPRequest: SerializedHTTPRequest; +}; + +export type RSAMessageCloseProxy = { + type: 'closeProxy'; + socketId: string; +}; + +export type RSAMessageUnload = { + type: 'unload'; + documentName: string; +}; + +export type RSAMessageClose = { + type: 'close'; + code?: number; + reason?: string; + socketId: string; +}; + +export type RSAMessagePing = { + type: 'ping'; + socketId: string; + replyTo: string; +}; + +export type RSAMessagePong = { + type: 'pong'; + socketId: string; +}; + +export type RSAMessageSend = { + type: 'send'; + // @ts-ignore + message: Uint8Array; + socketId: string; +}; + +export type RSAMessageCustomEventStart = { + type: 'customEventStart'; + documentName: string; + eventName: TName; + payload: TPayload; + replyTo: string; + replyId: number; +}; + +export type RSAMessageCustomEventComplete = { + type: 'customEventComplete'; + replyId: number; + payload: unknown; +}; + +export type RSAMessage = + | RSAMessageProxy + | RSAMessageCloseProxy + | RSAMessageUnload + | RSAMessageClose + | RSAMessagePing + | RSAMessagePong + | RSAMessageSend + | RSAMessageCustomEventStart + | RSAMessageCustomEventComplete; + +// @ts-ignore +export type Pack = (msg: RSAMessage) => string | Buffer; + +export type Unpack = ( + // @ts-ignore + packedMessage: Uint8Array | Buffer, +) => RSAMessage; + +type ServerId = string; +type DocumentName = string; +type CustomEventName = string; + +export type CustomEvents = Record< + CustomEventName, + (documentName: string, payload: unknown) => Promise +>; + +export interface Configuration { + redis: RedisClient; + pack: Pack; + unpack: Unpack; + serverId: ServerId; + lockTTL?: number; + customEventTTL?: number; + prefix?: string; + customEvents?: TCE; +} + +export type BaseWebSocket = EventEmitter & { + readyState: number; + close(code?: number, reason?: string): void; + ping(): void; + send(message: Uint8Array): void; +}; diff --git a/apps/server/src/collaboration/extensions/redis-sync/ws-socket-wrapper.ts b/apps/server/src/collaboration/extensions/redis-sync/ws-socket-wrapper.ts new file mode 100644 index 00000000..258e6e12 --- /dev/null +++ b/apps/server/src/collaboration/extensions/redis-sync/ws-socket-wrapper.ts @@ -0,0 +1,47 @@ +import { EventEmitter } from 'events'; +import type WebSocket from 'ws'; + +/** + * Wrapper around ws WebSocket that only receives events via emit(). + * This prevents double-handling when used with RedisSyncExtension. + */ +export class WsSocketWrapper extends EventEmitter { + private ws: WebSocket; + readyState = 1; + + constructor(ws: WebSocket) { + super(); + this.ws = ws; + this.once('close', () => { + this.readyState = 3; + }); + } + + close(code?: number, reason?: string) { + if (this.readyState !== 1) return; + this.readyState = 3; + try { + this.ws.close(code, reason); + } catch (e) { + // Socket already closed + } + } + + ping() { + if (this.readyState !== 1) return; + try { + this.ws.ping(); + } catch (e) { + // Socket already closed + } + } + + send(message: Uint8Array) { + if (this.readyState !== 1) return; + try { + this.ws.send(message); + } catch (e) { + // Socket already closed + } + } +} diff --git a/apps/server/src/collaboration/server/collab-main.ts b/apps/server/src/collaboration/server/collab-main.ts index 1a10167f..4a86a71b 100644 --- a/apps/server/src/collaboration/server/collab-main.ts +++ b/apps/server/src/collaboration/server/collab-main.ts @@ -12,9 +12,11 @@ async function bootstrap() { const app = await NestFactory.create( CollabAppModule, new FastifyAdapter({ - ignoreTrailingSlash: true, - ignoreDuplicateSlashes: true, - maxParamLength: 500, + routerOptions: { + maxParamLength: 1000, + ignoreTrailingSlash: true, + ignoreDuplicateSlashes: true, + }, }), { bufferLogs: true, diff --git a/package.json b/package.json index 2b5096ef..9ffe3d83 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,6 @@ "@casl/ability": "6.8.0", "@docmost/editor-ext": "workspace:*", "@floating-ui/dom": "^1.7.3", - "@hocuspocus/extension-redis": "3.4.3", "@hocuspocus/provider": "3.4.3", "@hocuspocus/server": "3.4.3", "@hocuspocus/transformer": "3.4.3", diff --git a/packages/editor-ext/src/lib/attachment/attachment.ts b/packages/editor-ext/src/lib/attachment/attachment.ts index 0e37e014..a1e851a4 100644 --- a/packages/editor-ext/src/lib/attachment/attachment.ts +++ b/packages/editor-ext/src/lib/attachment/attachment.ts @@ -96,7 +96,7 @@ export const Attachment = Node.create({ mergeAttributes( { "data-type": this.name }, this.options.HTMLAttributes, - HTMLAttributes, + HTMLAttributes ), [ "a", diff --git a/packages/editor-ext/src/lib/image/image.ts b/packages/editor-ext/src/lib/image/image.ts index e6426f23..e0f5053d 100644 --- a/packages/editor-ext/src/lib/image/image.ts +++ b/packages/editor-ext/src/lib/image/image.ts @@ -25,7 +25,7 @@ declare module "@tiptap/core" { imageBlock: { setImage: (attributes: ImageAttributes) => ReturnType; setImageAt: ( - attributes: ImageAttributes & { pos: number | Range }, + attributes: ImageAttributes & { pos: number | Range } ) => ReturnType; setImageAlign: (align: "left" | "center" | "right") => ReturnType; setImageWidth: (width: number) => ReturnType; diff --git a/packages/editor-ext/src/lib/video/video.ts b/packages/editor-ext/src/lib/video/video.ts index 31c68f89..c3c6ab3e 100644 --- a/packages/editor-ext/src/lib/video/video.ts +++ b/packages/editor-ext/src/lib/video/video.ts @@ -23,7 +23,7 @@ declare module "@tiptap/core" { videoBlock: { setVideo: (attributes: VideoAttributes) => ReturnType; setVideoAt: ( - attributes: VideoAttributes & { pos: number | Range }, + attributes: VideoAttributes & { pos: number | Range } ) => ReturnType; setVideoAlign: (align: "left" | "center" | "right") => ReturnType; setVideoWidth: (width: number) => ReturnType; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e3904280..7af8c424 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -30,9 +30,6 @@ importers: '@floating-ui/dom': specifier: ^1.7.3 version: 1.7.3 - '@hocuspocus/extension-redis': - specifier: 3.4.3 - version: 3.4.3(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29) '@hocuspocus/provider': specifier: 3.4.3 version: 3.4.3(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29) @@ -554,12 +551,18 @@ importers: ldapts: specifier: ^7.4.0 version: 7.4.0 + lib0: + specifier: ^0.2.117 + version: 0.2.117 mammoth: specifier: ^1.11.0 version: 1.11.0 mime-types: specifier: ^2.1.35 version: 2.1.35 + msgpackr: + specifier: ^1.11.8 + version: 1.11.8 nanoid: specifier: 3.3.11 version: 3.3.11 @@ -632,6 +635,9 @@ importers: tmp-promise: specifier: ^3.0.3 version: 3.0.3 + tseep: + specifier: ^1.3.1 + version: 1.3.1 typesense: specifier: ^2.1.0 version: 2.1.0(@babel/runtime@7.25.6) @@ -2383,12 +2389,6 @@ packages: '@hocuspocus/common@3.4.3': resolution: {integrity: sha512-wnBBO9sWcVAoUPEXN1qO+zk3HaEF9VTemxB6kjuuH6e1dHnD0v12m4P4X1wiZVhmMIX/PMl/fu3MGtYWQJz8gA==} - '@hocuspocus/extension-redis@3.4.3': - resolution: {integrity: sha512-r64Vpgk6tt0VZaQPEo1dQuyur2ozr243ncDcDM+4gFPuV8ZRUjL1rvaJTidb2HCcAW2zjfwshNxw4+OixeksBA==} - peerDependencies: - y-protocols: ^1.0.6 - yjs: ^13.6.8 - '@hocuspocus/provider@3.4.3': resolution: {integrity: sha512-zt+UgVXGsEQrqnDZgavc2PT9yKJjmVjV+5YxvhlmFVFLVORqawT4l601aKmLPhvyK97un4ZApZ5rso8iO6crWg==} peerDependencies: @@ -3884,12 +3884,6 @@ packages: '@selderee/plugin-htmlparser2@0.11.0': resolution: {integrity: sha512-P33hHGdldxGabLFjPPpaTxVolMrzrcegejx+0GxjrIb9Zv48D8yAIA/QTDR2dFl7Uz7urX8aX6+5bCZslr+gWQ==} - '@sesamecare-oss/redlock@1.4.0': - resolution: {integrity: sha512-2z589R+yxKLN4CgKxP1oN4dsg6Y548SE4bVYam/R0kHk7Q9VrQ9l66q+k1ehhSLLY4or9hcchuF9/MhuuZdjJg==} - engines: {node: '>=16'} - peerDependencies: - ioredis: '>=5' - '@sinclair/typebox@0.27.8': resolution: {integrity: sha512-+Fj43pSMwJs4KRrH/938Uf+uAELIgVBmQzg/q1YG10djyfA3TnrU8N8XzqCh/okZdszqBQTZf96idMfE5lnwTA==} @@ -7593,13 +7587,8 @@ packages: resolution: {integrity: sha512-+bT2uH4E5LGE7h/n3evcS/sQlJXCpIp6ym8OWJ5eV6+67Dsql/LaaT7qJBAt2rzfoa/5QBGBhxDix1dMt2kQKQ==} engines: {node: '>= 0.8.0'} - lib0@0.2.114: - resolution: {integrity: sha512-gcxmNFzA4hv8UYi8j43uPlQ7CGcyMJ2KQb5kZASw6SnAKAf10hK12i2fjrS3Cl/ugZa5Ui6WwIu1/6MIXiHttQ==} - engines: {node: '>=16'} - hasBin: true - - lib0@0.2.88: - resolution: {integrity: sha512-KyroiEvCeZcZEMx5Ys+b4u4eEBbA1ch7XUaBhYpwa/nPMrzTjUhI4RfcytmQfYoTBPcdyx+FX6WFNIoNuJzJfQ==} + lib0@0.2.117: + resolution: {integrity: sha512-DeXj9X5xDCjgKLU/7RR+/HQEVzuuEUiwldwOGsHK/sfAfELGWEyTcf0x+uOvCvK3O2zPmZePXWL85vtia6GyZw==} engines: {node: '>=16'} hasBin: true @@ -7975,8 +7964,8 @@ packages: resolution: {integrity: sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==} hasBin: true - msgpackr@1.11.2: - resolution: {integrity: sha512-F9UngXRlPyWCDEASDpTf6c9uNhGPTqnTeLVt7bN+bU1eajoR/8V9ys2BRaV5C/e5ihE6sJ9uPIKaYt6bFuO32g==} + msgpackr@1.11.8: + resolution: {integrity: sha512-bC4UGzHhVvgDNS7kn9tV8fAucIYUBuGojcaLiz7v+P63Lmtm0Xeji8B/8tYKddALXxJLpwIeBmUN3u64C4YkRA==} multimath@2.0.0: resolution: {integrity: sha512-toRx66cAMJ+Ccz7pMIg38xSIrtnbozk0dchXezwQDMgQmbGpfxjtv68H+L00iFL8hxDaVjrmwAFSb3I6bg8Q2g==} @@ -9671,6 +9660,9 @@ packages: resolution: {integrity: sha512-NoZ4roiN7LnbKn9QqE1amc9DJfzvZXxF4xDavcOWt1BPkdx+m+0gJuPM+S0vCe7zTJMYUP0R8pO2XMr+Y8oLIg==} engines: {node: '>=6'} + tseep@1.3.1: + resolution: {integrity: sha512-ZPtfk1tQnZVyr7BPtbJ93qaAh2lZuIOpTMjhrYa4XctT8xe7t4SAW9LIxrySDuYMsfNNayE51E/WNGrNVgVicQ==} + tslib@2.8.0: resolution: {integrity: sha512-jWVzBLplnCmoaTr13V9dYbiQ99wvZRd0vNWaDRg+aVYRcjDF3nDksxFDE/+fkXnKhpnUUkmx5pK/v8mCtLVqZA==} @@ -12659,27 +12651,13 @@ snapshots: '@hocuspocus/common@3.4.3': dependencies: - lib0: 0.2.114 - - '@hocuspocus/extension-redis@3.4.3(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29)': - dependencies: - '@hocuspocus/server': 3.4.3(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29) - '@sesamecare-oss/redlock': 1.4.0(ioredis@5.8.2) - ioredis: 5.8.2 - kleur: 4.1.5 - lodash.debounce: 4.0.8 - y-protocols: 1.0.6(yjs@13.6.29) - yjs: 13.6.29 - transitivePeerDependencies: - - bufferutil - - supports-color - - utf-8-validate + lib0: 0.2.117 '@hocuspocus/provider@3.4.3(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29)': dependencies: '@hocuspocus/common': 3.4.3 '@lifeomic/attempt': 3.0.3 - lib0: 0.2.114 + lib0: 0.2.117 ws: 8.19.0 y-protocols: 1.0.6(yjs@13.6.29) yjs: 13.6.29 @@ -12693,7 +12671,7 @@ snapshots: async-lock: 1.4.1 async-mutex: 0.5.0 kleur: 4.1.5 - lib0: 0.2.114 + lib0: 0.2.117 ws: 8.19.0 y-protocols: 1.0.6(yjs@13.6.29) yjs: 13.6.29 @@ -14153,10 +14131,6 @@ snapshots: domhandler: 5.0.3 selderee: 0.11.0 - '@sesamecare-oss/redlock@1.4.0(ioredis@5.8.2)': - dependencies: - ioredis: 5.8.2 - '@sinclair/typebox@0.27.8': {} '@sindresorhus/slugify@1.1.0': @@ -14867,7 +14841,7 @@ snapshots: '@tiptap/y-tiptap@3.0.1(prosemirror-model@1.25.1)(prosemirror-state@1.4.3)(prosemirror-view@1.40.0)(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29)': dependencies: - lib0: 0.2.114 + lib0: 0.2.117 prosemirror-model: 1.25.1 prosemirror-state: 1.4.3 prosemirror-view: 1.40.0 @@ -16065,7 +16039,7 @@ snapshots: dependencies: cron-parser: 4.9.0 ioredis: 5.8.2 - msgpackr: 1.11.2 + msgpackr: 1.11.8 node-abort-controller: 3.1.1 semver: 7.7.2 tslib: 2.8.1 @@ -18687,11 +18661,7 @@ snapshots: prelude-ls: 1.2.1 type-check: 0.4.0 - lib0@0.2.114: - dependencies: - isomorphic.js: 0.2.5 - - lib0@0.2.88: + lib0@0.2.117: dependencies: isomorphic.js: 0.2.5 @@ -19166,7 +19136,7 @@ snapshots: '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.2 optional: true - msgpackr@1.11.2: + msgpackr@1.11.8: optionalDependencies: msgpackr-extract: 3.0.2 @@ -21046,6 +21016,8 @@ snapshots: minimist: 1.2.8 strip-bom: 3.0.0 + tseep@1.3.1: {} + tslib@2.8.0: {} tslib@2.8.1: {} @@ -21519,12 +21491,12 @@ snapshots: y-indexeddb@9.0.12(yjs@13.6.29): dependencies: - lib0: 0.2.88 + lib0: 0.2.117 yjs: 13.6.29 y-prosemirror@1.3.7(prosemirror-model@1.25.1)(prosemirror-state@1.4.3)(prosemirror-view@1.40.0)(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29): dependencies: - lib0: 0.2.114 + lib0: 0.2.117 prosemirror-model: 1.25.1 prosemirror-state: 1.4.3 prosemirror-view: 1.40.0 @@ -21533,7 +21505,7 @@ snapshots: y-protocols@1.0.6(yjs@13.6.29): dependencies: - lib0: 0.2.114 + lib0: 0.2.117 yjs: 13.6.29 y18n@4.0.3: {} @@ -21586,7 +21558,7 @@ snapshots: yjs@13.6.29: dependencies: - lib0: 0.2.114 + lib0: 0.2.117 yn@3.1.1: {}