From 75673ad96402bc5ab53615b06e80fcdc950eaa66 Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Mon, 26 Jan 2026 01:27:21 +0000 Subject: [PATCH] expose event handler --- .../collaboration/collaboration.gateway.ts | 55 +++++++++++++------ .../collaboration/collaboration.handler.ts | 42 ++++++++++++++ .../src/collaboration/collaboration.module.ts | 2 + .../redis-sync/redis-sync.extension.ts | 19 +++---- .../extensions/redis-sync/redis-sync.types.ts | 6 +- 5 files changed, 93 insertions(+), 31 deletions(-) create mode 100644 apps/server/src/collaboration/collaboration.handler.ts diff --git a/apps/server/src/collaboration/collaboration.gateway.ts b/apps/server/src/collaboration/collaboration.gateway.ts index 53798baa..2f0f2085 100644 --- a/apps/server/src/collaboration/collaboration.gateway.ts +++ b/apps/server/src/collaboration/collaboration.gateway.ts @@ -19,24 +19,43 @@ import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper'; import RedisClient from 'ioredis'; import { pack, unpack } from 'msgpackr'; import { CollabWsAdapter } from './adapter/collab-ws.adapter'; +import { + CollaborationHandler, + CollabEventHandlers, +} from './collaboration.handler'; @Injectable() export class CollaborationGateway { private readonly hocuspocus: Hocuspocus; private redisConfig: RedisConfig; - private readonly redisSync: RedisSyncExtension<{}> | null = null; - private readonly useRedisSync: boolean; + // @ts-ignore + private readonly redisSync: RedisSyncExtension | null = + null; + private readonly withRedis: boolean; constructor( private authenticationExtension: AuthenticationExtension, private persistenceExtension: PersistenceExtension, private loggerExtension: LoggerExtension, private environmentService: EnvironmentService, + private collabEventsService: CollaborationHandler, ) { this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl()); - this.useRedisSync = !this.environmentService.isCollabDisableRedis(); + this.withRedis = !this.environmentService.isCollabDisableRedis(); - if (this.useRedisSync) { + this.hocuspocus = new Hocuspocus({ + debounce: 10000, + maxDebounce: 45000, + unloadImmediately: false, + extensions: [ + this.authenticationExtension, + this.persistenceExtension, + this.loggerExtension, + ], + }); + + if (this.withRedis) { + // @ts-ignore this.redisSync = new RedisSyncExtension({ redis: new RedisClient({ host: this.redisConfig.host, @@ -47,24 +66,16 @@ export class CollaborationGateway { retryStrategy: createRetryStrategy(), }), serverId: `collab-${process.pid}`, - prefix: `collab`, + prefix: 'collab', pack, unpack, - customEvents: {}, + // @ts-ignore + customEvents: this.collabEventsService.getHandlers(this.hocuspocus), }); + this.hocuspocus.configuration.extensions.push(this.redisSync); + // @ts-ignore + this.redisSync.onConfigure({ instance: this.hocuspocus }); } - - this.hocuspocus = new Hocuspocus({ - debounce: 10000, - maxDebounce: 45000, - unloadImmediately: false, - extensions: [ - this.authenticationExtension, - this.persistenceExtension, - this.loggerExtension, - ...(this.redisSync ? [this.redisSync] : []), - ], - }); } private serializeRequest(request: IncomingMessage): SerializedHTTPRequest { @@ -123,6 +134,14 @@ export class CollaborationGateway { return this.hocuspocus.getDocumentsCount(); } + handleYjsEvent( + eventName: TName, + documentName: string, + payload: Parameters[1], + ) { + return this.redisSync?.handleEvent(eventName, documentName, payload); + } + async destroy(collabWsAdapter: CollabWsAdapter): Promise { // eslint-disable-next-line no-async-promise-executor await new Promise(async (resolve) => { diff --git a/apps/server/src/collaboration/collaboration.handler.ts b/apps/server/src/collaboration/collaboration.handler.ts new file mode 100644 index 00000000..ec746550 --- /dev/null +++ b/apps/server/src/collaboration/collaboration.handler.ts @@ -0,0 +1,42 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Hocuspocus, Document } from '@hocuspocus/server'; + +export type CollabEventHandlers = ReturnType< + CollaborationHandler['getHandlers'] +>; + +@Injectable() +export class CollaborationHandler { + private readonly logger = new Logger(CollaborationHandler.name); + + constructor() {} + + getHandlers(hocuspocus: Hocuspocus) { + return { + alterState: async (documentName: string, payload: { pageId: string }) => { + // dummy + // this.logger.log('Processing', documentName, payload); + // await this.withYdocConnection(hocuspocus, documentName, {}, (doc) => { + // const fragment = doc.getXmlFragment('default'); + //}); + }, + }; + } + + async withYdocConnection( + hocuspocus: Hocuspocus, + documentName: string, + context: any = {}, + fn: (doc: Document) => void, + ): Promise { + const connection = await hocuspocus.openDirectConnection( + documentName, + context, + ); + try { + await connection.transact(fn); + } finally { + await connection.disconnect(); + } + } +} diff --git a/apps/server/src/collaboration/collaboration.module.ts b/apps/server/src/collaboration/collaboration.module.ts index 4f085e93..e9374c53 100644 --- a/apps/server/src/collaboration/collaboration.module.ts +++ b/apps/server/src/collaboration/collaboration.module.ts @@ -9,6 +9,7 @@ import { WebSocket } from 'ws'; import { TokenModule } from '../core/auth/token.module'; import { HistoryListener } from './listeners/history.listener'; import { LoggerExtension } from './extensions/logger.extension'; +import { CollaborationHandler } from './collaboration.handler'; @Module({ providers: [ @@ -17,6 +18,7 @@ import { LoggerExtension } from './extensions/logger.extension'; PersistenceExtension, LoggerExtension, HistoryListener, + CollaborationHandler, ], exports: [CollaborationGateway], imports: [TokenModule], diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts index 7d4400ad..7ca1198f 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts @@ -1,19 +1,19 @@ // Adapted from https://github.com/ueberdosis/hocuspocus/pull/1008 - MIT -import type { IncomingMessage } from 'node:http'; +import { IncomingMessage } from 'node:http'; import { - type Extension, - type Hocuspocus, + Extension, + Hocuspocus, IncomingMessage as SocketIncomingMessage, - type afterUnloadDocumentPayload, - type onConfigurePayload, - type onLoadDocumentPayload, + afterUnloadDocumentPayload, + onConfigurePayload, + onLoadDocumentPayload, } from '@hocuspocus/server'; -import type RedisClient from 'ioredis'; +import RedisClient from 'ioredis'; import { readVarString } from 'lib0/decoding.js'; -import type { WebSocket } from 'ws'; +import { WebSocket } from 'ws'; import { CollabProxySocket } from './collab-proxy-socket'; import { Injectable, Logger } from '@nestjs/common'; -import type { +import { BaseWebSocket, Configuration, CustomEvents, @@ -35,7 +35,6 @@ type ServerId = string; type DocumentName = string; type SocketId = string; -@Injectable() export class RedisSyncExtension implements Extension { private readonly logger = new Logger('Collab' + RedisSyncExtension.name); priority = 1000; diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts index aefdcc19..7762fc0f 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts @@ -1,6 +1,6 @@ -import type EventEmitter from 'node:events'; -import type { IncomingHttpHeaders } from 'node:http2'; -import type RedisClient from 'ioredis'; +import EventEmitter from 'node:events'; +import { IncomingHttpHeaders } from 'node:http2'; +import RedisClient from 'ioredis'; export type SecondParam = T extends ( arg1: unknown,