import { Hocuspocus } from '@hocuspocus/server'; import { IncomingMessage } from 'http'; import WebSocket from 'ws'; import { AuthenticationExtension } from './extensions/authentication.extension'; import { PersistenceExtension } from './extensions/persistence.extension'; import { Injectable } from '@nestjs/common'; import { EnvironmentService } from '../integrations/environment/environment.service'; import { createRetryStrategy, parseRedisUrl, RedisConfig, } from '../common/helpers'; import { LoggerExtension } from './extensions/logger.extension'; import { RedisSyncExtension, SerializedHTTPRequest, } from './extensions/redis-sync'; import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper'; import RedisClient from 'ioredis'; import { pack, unpack } from 'msgpackr'; import { nanoid } from 'nanoid'; import * as os from 'node:os'; import { CollabWsAdapter } from './adapter/collab-ws.adapter'; import { CollaborationHandler, CollabEventHandlers, } from './collaboration.handler'; @Injectable() export class CollaborationGateway { private readonly hocuspocus: Hocuspocus; private redisConfig: RedisConfig; // @ts-ignore private readonly redisSync: RedisSyncExtension | null = null; private readonly withRedis: boolean; constructor( private authenticationExtension: AuthenticationExtension, private persistenceExtension: PersistenceExtension, private loggerExtension: LoggerExtension, private environmentService: EnvironmentService, private collabEventsService: CollaborationHandler, ) { this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl()); this.withRedis = !this.environmentService.isCollabDisableRedis(); this.hocuspocus = new Hocuspocus({ debounce: 10000, maxDebounce: 45000, unloadImmediately: false, extensions: [ this.authenticationExtension, this.persistenceExtension, this.loggerExtension, ], }); if (this.withRedis) { // @ts-ignore this.redisSync = new RedisSyncExtension({ redis: new RedisClient({ host: this.redisConfig.host, port: this.redisConfig.port, password: this.redisConfig.password, db: this.redisConfig.db, family: this.redisConfig.family, retryStrategy: createRetryStrategy(), }), serverId: `collab-${os?.hostname()}-${nanoid(10)}`, prefix: 'collab', pack, unpack, // @ts-ignore customEvents: this.collabEventsService.getHandlers(this.hocuspocus), }); this.hocuspocus.configuration.extensions.push(this.redisSync); // @ts-ignore this.redisSync.onConfigure({ instance: this.hocuspocus }); } } private serializeRequest(request: IncomingMessage): SerializedHTTPRequest { return { method: request.method ?? 'GET', url: request.url ?? '/', headers: { 'sec-websocket-key': request.headers['sec-websocket-key'] ?? '', 'sec-websocket-protocol': request.headers['sec-websocket-protocol'] ?? '', }, socket: { remoteAddress: request.socket?.remoteAddress ?? '' }, }; } handleConnection(client: WebSocket, request: IncomingMessage): any { if (this.redisSync) { const serializedHTTPRequest = this.serializeRequest(request); const socketId = serializedHTTPRequest.headers['sec-websocket-key']; // Create wrapper socket that only receives events via emit() // This prevents double-handling since Hocuspocus won't listen to raw WebSocket events const wrappedSocket = new WsSocketWrapper(client); // Route through RedisSync extension (this calls handleConnection internally) this.redisSync.onSocketOpen(wrappedSocket as any, serializedHTTPRequest); // Forward raw WebSocket messages to the extension client.on('message', (data: ArrayBuffer) => { this.redisSync!.onSocketMessage( wrappedSocket as any, serializedHTTPRequest, data, ); }); // Forward close events client.on('close', (code: number, reason: Buffer) => { this.redisSync!.onSocketClose(socketId, code, reason); }); // Forward pong events for keepalive client.on('pong', (data: Buffer) => { wrappedSocket.emit('pong', data); }); } else { // Fallback to direct Hocuspocus connection this.hocuspocus.handleConnection(client, request); } } getConnectionCount() { return this.hocuspocus.getConnectionsCount(); } getDocumentCount() { return this.hocuspocus.getDocumentsCount(); } handleYjsEvent( eventName: TName, documentName: string, payload: Parameters[1], ) { return this.redisSync?.handleEvent(eventName, documentName, payload); } openDirectConnection(documentName: string, context?: any) { return this.hocuspocus.openDirectConnection(documentName, context); } /* *Can be used before calling openDirectConnection directly */ async lockDocument(documentName: string) { return this.redisSync.lockDocument(documentName); } /* *Releases a document lock and stops the interval that maintains it. */ async releaseLock(documentName: string) { return this.redisSync.releaseLock(documentName); } async destroy(collabWsAdapter: CollabWsAdapter): Promise { // eslint-disable-next-line no-async-promise-executor await new Promise(async (resolve) => { try { // Wait for all documents to unload this.hocuspocus.configuration.extensions.push({ async afterUnloadDocument({ instance }) { if (instance.getDocumentsCount() === 0) resolve(''); }, }); collabWsAdapter?.close(); if (this.hocuspocus.getDocumentsCount() === 0) resolve(''); this.hocuspocus.closeConnections(); } catch (error) { console.error(error); } }); await this.hocuspocus.hooks('onDestroy', { instance: this.hocuspocus }); } }