expose event handler

This commit is contained in:
Philipinho
2026-01-26 01:27:21 +00:00
parent 3157131bf2
commit 75673ad964
5 changed files with 93 additions and 31 deletions
@@ -19,24 +19,43 @@ import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper';
import RedisClient from 'ioredis'; import RedisClient from 'ioredis';
import { pack, unpack } from 'msgpackr'; import { pack, unpack } from 'msgpackr';
import { CollabWsAdapter } from './adapter/collab-ws.adapter'; import { CollabWsAdapter } from './adapter/collab-ws.adapter';
import {
CollaborationHandler,
CollabEventHandlers,
} from './collaboration.handler';
@Injectable() @Injectable()
export class CollaborationGateway { export class CollaborationGateway {
private readonly hocuspocus: Hocuspocus; private readonly hocuspocus: Hocuspocus;
private redisConfig: RedisConfig; private redisConfig: RedisConfig;
private readonly redisSync: RedisSyncExtension<{}> | null = null; // @ts-ignore
private readonly useRedisSync: boolean; private readonly redisSync: RedisSyncExtension<CollabEventHandlers> | null =
null;
private readonly withRedis: boolean;
constructor( constructor(
private authenticationExtension: AuthenticationExtension, private authenticationExtension: AuthenticationExtension,
private persistenceExtension: PersistenceExtension, private persistenceExtension: PersistenceExtension,
private loggerExtension: LoggerExtension, private loggerExtension: LoggerExtension,
private environmentService: EnvironmentService, private environmentService: EnvironmentService,
private collabEventsService: CollaborationHandler,
) { ) {
this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl()); this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl());
this.useRedisSync = !this.environmentService.isCollabDisableRedis(); this.withRedis = !this.environmentService.isCollabDisableRedis();
if (this.useRedisSync) { this.hocuspocus = new Hocuspocus({
debounce: 10000,
maxDebounce: 45000,
unloadImmediately: false,
extensions: [
this.authenticationExtension,
this.persistenceExtension,
this.loggerExtension,
],
});
if (this.withRedis) {
// @ts-ignore
this.redisSync = new RedisSyncExtension({ this.redisSync = new RedisSyncExtension({
redis: new RedisClient({ redis: new RedisClient({
host: this.redisConfig.host, host: this.redisConfig.host,
@@ -47,24 +66,16 @@ export class CollaborationGateway {
retryStrategy: createRetryStrategy(), retryStrategy: createRetryStrategy(),
}), }),
serverId: `collab-${process.pid}`, serverId: `collab-${process.pid}`,
prefix: `collab`, prefix: 'collab',
pack, pack,
unpack, unpack,
customEvents: {}, // @ts-ignore
customEvents: this.collabEventsService.getHandlers(this.hocuspocus),
}); });
this.hocuspocus.configuration.extensions.push(this.redisSync);
// @ts-ignore
this.redisSync.onConfigure({ instance: this.hocuspocus });
} }
this.hocuspocus = new Hocuspocus({
debounce: 10000,
maxDebounce: 45000,
unloadImmediately: false,
extensions: [
this.authenticationExtension,
this.persistenceExtension,
this.loggerExtension,
...(this.redisSync ? [this.redisSync] : []),
],
});
} }
private serializeRequest(request: IncomingMessage): SerializedHTTPRequest { private serializeRequest(request: IncomingMessage): SerializedHTTPRequest {
@@ -123,6 +134,14 @@ export class CollaborationGateway {
return this.hocuspocus.getDocumentsCount(); return this.hocuspocus.getDocumentsCount();
} }
handleYjsEvent<TName extends keyof CollabEventHandlers>(
eventName: TName,
documentName: string,
payload: Parameters<CollabEventHandlers[TName]>[1],
) {
return this.redisSync?.handleEvent(eventName, documentName, payload);
}
async destroy(collabWsAdapter: CollabWsAdapter): Promise<void> { async destroy(collabWsAdapter: CollabWsAdapter): Promise<void> {
// eslint-disable-next-line no-async-promise-executor // eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => { await new Promise(async (resolve) => {
@@ -0,0 +1,42 @@
import { Injectable, Logger } from '@nestjs/common';
import { Hocuspocus, Document } from '@hocuspocus/server';
export type CollabEventHandlers = ReturnType<
CollaborationHandler['getHandlers']
>;
@Injectable()
export class CollaborationHandler {
private readonly logger = new Logger(CollaborationHandler.name);
constructor() {}
getHandlers(hocuspocus: Hocuspocus) {
return {
alterState: async (documentName: string, payload: { pageId: string }) => {
// dummy
// this.logger.log('Processing', documentName, payload);
// await this.withYdocConnection(hocuspocus, documentName, {}, (doc) => {
// const fragment = doc.getXmlFragment('default');
//});
},
};
}
async withYdocConnection(
hocuspocus: Hocuspocus,
documentName: string,
context: any = {},
fn: (doc: Document) => void,
): Promise<void> {
const connection = await hocuspocus.openDirectConnection(
documentName,
context,
);
try {
await connection.transact(fn);
} finally {
await connection.disconnect();
}
}
}
@@ -9,6 +9,7 @@ import { WebSocket } from 'ws';
import { TokenModule } from '../core/auth/token.module'; import { TokenModule } from '../core/auth/token.module';
import { HistoryListener } from './listeners/history.listener'; import { HistoryListener } from './listeners/history.listener';
import { LoggerExtension } from './extensions/logger.extension'; import { LoggerExtension } from './extensions/logger.extension';
import { CollaborationHandler } from './collaboration.handler';
@Module({ @Module({
providers: [ providers: [
@@ -17,6 +18,7 @@ import { LoggerExtension } from './extensions/logger.extension';
PersistenceExtension, PersistenceExtension,
LoggerExtension, LoggerExtension,
HistoryListener, HistoryListener,
CollaborationHandler,
], ],
exports: [CollaborationGateway], exports: [CollaborationGateway],
imports: [TokenModule], imports: [TokenModule],
@@ -1,19 +1,19 @@
// Adapted from https://github.com/ueberdosis/hocuspocus/pull/1008 - MIT // Adapted from https://github.com/ueberdosis/hocuspocus/pull/1008 - MIT
import type { IncomingMessage } from 'node:http'; import { IncomingMessage } from 'node:http';
import { import {
type Extension, Extension,
type Hocuspocus, Hocuspocus,
IncomingMessage as SocketIncomingMessage, IncomingMessage as SocketIncomingMessage,
type afterUnloadDocumentPayload, afterUnloadDocumentPayload,
type onConfigurePayload, onConfigurePayload,
type onLoadDocumentPayload, onLoadDocumentPayload,
} from '@hocuspocus/server'; } from '@hocuspocus/server';
import type RedisClient from 'ioredis'; import RedisClient from 'ioredis';
import { readVarString } from 'lib0/decoding.js'; import { readVarString } from 'lib0/decoding.js';
import type { WebSocket } from 'ws'; import { WebSocket } from 'ws';
import { CollabProxySocket } from './collab-proxy-socket'; import { CollabProxySocket } from './collab-proxy-socket';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import type { import {
BaseWebSocket, BaseWebSocket,
Configuration, Configuration,
CustomEvents, CustomEvents,
@@ -35,7 +35,6 @@ type ServerId = string;
type DocumentName = string; type DocumentName = string;
type SocketId = string; type SocketId = string;
@Injectable()
export class RedisSyncExtension<TCE extends CustomEvents> implements Extension { export class RedisSyncExtension<TCE extends CustomEvents> implements Extension {
private readonly logger = new Logger('Collab' + RedisSyncExtension.name); private readonly logger = new Logger('Collab' + RedisSyncExtension.name);
priority = 1000; priority = 1000;
@@ -1,6 +1,6 @@
import type EventEmitter from 'node:events'; import EventEmitter from 'node:events';
import type { IncomingHttpHeaders } from 'node:http2'; import { IncomingHttpHeaders } from 'node:http2';
import type RedisClient from 'ioredis'; import RedisClient from 'ioredis';
export type SecondParam<T> = T extends ( export type SecondParam<T> = T extends (
arg1: unknown, arg1: unknown,