diff --git a/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts b/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts index a168af0c..2633fccf 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/collab-proxy-socket.ts @@ -5,7 +5,7 @@ import type { RSAMessageClose, RSAMessagePing, RSAMessageSend, -} from './redis-sync.extension'; +} from './redis-sync.types'; export class CollabProxySocket extends EventEmitter { private replyTo: string; diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts index c4013126..fb01bb13 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts @@ -1,7 +1,5 @@ // Source https://github.com/ueberdosis/hocuspocus/pull/1008 -import type EventEmitter from 'node:events'; import type { IncomingMessage } from 'node:http'; -import type { IncomingHttpHeaders } from 'node:http2'; import { type Extension, type Hocuspocus, @@ -14,129 +12,36 @@ import type RedisClient from 'ioredis'; import { readVarString } from 'lib0/decoding.js'; import type { WebSocket } from 'ws'; import { CollabProxySocket } from './collab-proxy-socket'; +import { Injectable } from '@nestjs/common'; +import type { + BaseWebSocket, + Configuration, + CustomEvents, + Pack, + RSAMessage, + RSAMessageCloseProxy, + RSAMessageCustomEventComplete, + RSAMessageCustomEventStart, + RSAMessagePong, + RSAMessageProxy, + RSAMessageUnload, + SerializedHTTPRequest, + Unpack, +} from './redis-sync.types'; -export type SecondParam = T extends ( - arg1: unknown, - arg2: infer A, - ...args: unknown[] -) => unknown - ? A - : never; -export type RSAMessageProxy = { - type: 'proxy'; - replyTo: string; - // @ts-ignore - message: Uint8Array; - serializedHTTPRequest: SerializedHTTPRequest; -}; +export type { Pack, SerializedHTTPRequest } from './redis-sync.types'; -export type RSAMessageCloseProxy = { - type: 'closeProxy'; - socketId: string; -}; - -export type RSAMessageUnload = { - type: 'unload'; - documentName: string; -}; - -export type RSAMessageClose = { - type: 'close'; - code?: number; - reason?: string; - socketId: string; -}; - -export type RSAMessagePing = { - type: 'ping'; - socketId: string; - respondTo: string; -}; - -export type RSAMessagePong = { - type: 'pong'; - socketId: string; -}; - -export type RSAMessageSend = { - type: 'send'; - // @ts-ignore - message: Uint8Array; - socketId: string; -}; - -export type RSAMessageCustomEventStart = { - type: 'customEventStart'; - documentName: string; - eventName: TName; - payload: TPayload; - replyTo: string; - replyId: number; -}; - -export type RSAMessageCustomEventComplete = { - type: 'customEventComplete'; - replyId: number; - payload: unknown; -}; - -export type RSAMessage = - | RSAMessageProxy - | RSAMessageCloseProxy - | RSAMessageUnload - | RSAMessageClose - | RSAMessagePing - | RSAMessagePong - | RSAMessageSend - | RSAMessageCustomEventStart - | RSAMessageCustomEventComplete; - -export type SerializedHTTPRequest = { - method: string; - url: string; - headers: IncomingHttpHeaders & { 'sec-websocket-key': string }; - socket: { remoteAddress: string }; -}; -// @ts-ignore -export type Pack = (msg: RSAMessage) => string | Buffer; -type Unpack = ( - // @ts-ignore - packedMessage: Uint8Array | Buffer, -) => RSAMessage; type ServerId = string; type DocumentName = string; type SocketId = string; -type CustomEventName = string; -type CustomEvents = Record< - CustomEventName, - (documentName: string, payload: unknown) => Promise ->; - -interface Configuration { - redis: RedisClient; - pack: Pack; - unpack: Unpack; - serverId: ServerId; - lockTTL?: number; - customEventTTL?: number; - proxySocketTTL?: number; - prefix?: string; - customEvents?: TCE; -} - -interface BaseWebSocket extends EventEmitter { - readyState: number; - close(code?: number, reason?: string): void; - ping(): void; - send(message: Uint8Array): void; -} +@Injectable() export class RedisSyncExtension implements Extension { priority = 1000; - private pub: RedisClient; + private readonly pub: RedisClient; private sub: RedisClient; - private pack: Pack; - private unpack: Unpack; + private readonly pack: Pack; + private readonly unpack: Unpack; private originSockets: Record = {}; private locks: Record = {}; private lockPromises: Record> = {}; @@ -144,15 +49,15 @@ export class RedisSyncExtension implements Extension { SocketId, { socket: CollabProxySocket; cleanup: NodeJS.Timeout } > = {}; - private prefix: string; - private lockPrefix: string; - private msgChannel: string; - private serverId: ServerId; - private customEventTTL: number; - private lockTTL: number; - private proxySocketTTL: number; + private readonly prefix: string; + private readonly lockPrefix: string; + private readonly msgChannel: string; + private readonly serverId: ServerId; + private readonly customEventTTL: number; + private readonly lockTTL: number; + private readonly proxySocketTTL: number; private instance!: Hocuspocus; - private customEvents: TCE; + private readonly customEvents: TCE; private replyIdCounter = 0; private pendingReplies: Record< number, diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts new file mode 100644 index 00000000..aefdcc19 --- /dev/null +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts @@ -0,0 +1,123 @@ +import type EventEmitter from 'node:events'; +import type { IncomingHttpHeaders } from 'node:http2'; +import type RedisClient from 'ioredis'; + +export type SecondParam = T extends ( + arg1: unknown, + arg2: infer A, + ...args: unknown[] +) => unknown + ? A + : never; + +export type SerializedHTTPRequest = { + method: string; + url: string; + headers: IncomingHttpHeaders & { 'sec-websocket-key': string }; + socket: { remoteAddress: string }; +}; + +export type RSAMessageProxy = { + type: 'proxy'; + replyTo: string; + // @ts-ignore + message: Uint8Array; + serializedHTTPRequest: SerializedHTTPRequest; +}; + +export type RSAMessageCloseProxy = { + type: 'closeProxy'; + socketId: string; +}; + +export type RSAMessageUnload = { + type: 'unload'; + documentName: string; +}; + +export type RSAMessageClose = { + type: 'close'; + code?: number; + reason?: string; + socketId: string; +}; + +export type RSAMessagePing = { + type: 'ping'; + socketId: string; + respondTo: string; +}; + +export type RSAMessagePong = { + type: 'pong'; + socketId: string; +}; + +export type RSAMessageSend = { + type: 'send'; + // @ts-ignore + message: Uint8Array; + socketId: string; +}; + +export type RSAMessageCustomEventStart = { + type: 'customEventStart'; + documentName: string; + eventName: TName; + payload: TPayload; + replyTo: string; + replyId: number; +}; + +export type RSAMessageCustomEventComplete = { + type: 'customEventComplete'; + replyId: number; + payload: unknown; +}; + +export type RSAMessage = + | RSAMessageProxy + | RSAMessageCloseProxy + | RSAMessageUnload + | RSAMessageClose + | RSAMessagePing + | RSAMessagePong + | RSAMessageSend + | RSAMessageCustomEventStart + | RSAMessageCustomEventComplete; + +// @ts-ignore +export type Pack = (msg: RSAMessage) => string | Buffer; + +export type Unpack = ( + // @ts-ignore + packedMessage: Uint8Array | Buffer, +) => RSAMessage; + +type ServerId = string; +type DocumentName = string; +type CustomEventName = string; + +export type CustomEvents = Record< + CustomEventName, + (documentName: string, payload: unknown) => Promise +>; + +export type Configuration = { + redis: RedisClient; + pack: Pack; + unpack: Unpack; + serverId: ServerId; + lockTTL?: number; + customEventTTL?: number; + proxySocketTTL?: number; + prefix?: string; + customEvents?: TCE; +}; + +export type BaseWebSocket = EventEmitter & { + readyState: number; + close(code?: number, reason?: string): void; + ping(): void; + send(message: Uint8Array): void; +};