mirror of
https://github.com/docmost/docmost.git
synced 2026-05-07 06:23:06 +08:00
feat(collab): better redis extension
This commit is contained in:
@@ -75,8 +75,10 @@
|
||||
"kysely": "^0.28.2",
|
||||
"kysely-migration-cli": "^0.4.2",
|
||||
"ldapts": "^7.4.0",
|
||||
"lib0": "^0.2.117",
|
||||
"mammoth": "^1.11.0",
|
||||
"mime-types": "^2.1.35",
|
||||
"msgpackr": "^1.11.8",
|
||||
"nanoid": "3.3.11",
|
||||
"nestjs-kysely": "^1.2.0",
|
||||
"nodemailer": "^7.0.12",
|
||||
@@ -98,6 +100,7 @@
|
||||
"socket.io": "^4.8.3",
|
||||
"stripe": "^17.5.0",
|
||||
"tmp-promise": "^3.0.3",
|
||||
"tseep": "^1.3.1",
|
||||
"typesense": "^2.1.0",
|
||||
"ws": "^8.19.0",
|
||||
"yauzl": "^3.2.0"
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import { Hocuspocus, Server as HocuspocusServer } from '@hocuspocus/server';
|
||||
import { Hocuspocus } from '@hocuspocus/server';
|
||||
import { IncomingMessage } from 'http';
|
||||
import WebSocket from 'ws';
|
||||
import { AuthenticationExtension } from './extensions/authentication.extension';
|
||||
import { PersistenceExtension } from './extensions/persistence.extension';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Redis } from '@hocuspocus/extension-redis';
|
||||
import { EnvironmentService } from '../integrations/environment/environment.service';
|
||||
import {
|
||||
createRetryStrategy,
|
||||
@@ -12,11 +11,20 @@ import {
|
||||
RedisConfig,
|
||||
} from '../common/helpers';
|
||||
import { LoggerExtension } from './extensions/logger.extension';
|
||||
import {
|
||||
RedisSyncExtension,
|
||||
SerializedHTTPRequest,
|
||||
} from './extensions/redis-sync';
|
||||
import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper';
|
||||
import RedisClient from 'ioredis';
|
||||
import { pack, unpack } from 'msgpackr';
|
||||
|
||||
@Injectable()
|
||||
export class CollaborationGateway {
|
||||
private hocuspocus: Hocuspocus;
|
||||
private redisConfig: RedisConfig;
|
||||
private redisSync: RedisSyncExtension<{}> | null = null;
|
||||
private useRedisSync: boolean;
|
||||
|
||||
constructor(
|
||||
private authenticationExtension: AuthenticationExtension,
|
||||
@@ -25,6 +33,24 @@ export class CollaborationGateway {
|
||||
private environmentService: EnvironmentService,
|
||||
) {
|
||||
this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl());
|
||||
this.useRedisSync = !this.environmentService.isCollabDisableRedis();
|
||||
|
||||
if (this.useRedisSync) {
|
||||
this.redisSync = new RedisSyncExtension({
|
||||
redis: new RedisClient({
|
||||
host: this.redisConfig.host,
|
||||
port: this.redisConfig.port,
|
||||
password: this.redisConfig.password,
|
||||
db: this.redisConfig.db,
|
||||
family: this.redisConfig.family,
|
||||
retryStrategy: createRetryStrategy(),
|
||||
}),
|
||||
serverId: `collab-${process.pid}`,
|
||||
pack,
|
||||
unpack,
|
||||
customEvents: {},
|
||||
});
|
||||
}
|
||||
|
||||
this.hocuspocus = new Hocuspocus({
|
||||
debounce: 10000,
|
||||
@@ -34,26 +60,57 @@ export class CollaborationGateway {
|
||||
this.authenticationExtension,
|
||||
this.persistenceExtension,
|
||||
this.loggerExtension,
|
||||
...(this.environmentService.isCollabDisableRedis()
|
||||
? []
|
||||
: [
|
||||
new Redis({
|
||||
host: this.redisConfig.host,
|
||||
port: this.redisConfig.port,
|
||||
options: {
|
||||
password: this.redisConfig.password,
|
||||
db: this.redisConfig.db,
|
||||
family: this.redisConfig.family,
|
||||
retryStrategy: createRetryStrategy(),
|
||||
},
|
||||
}),
|
||||
]),
|
||||
...(this.redisSync ? [this.redisSync] : []),
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
private serializeRequest(request: IncomingMessage): SerializedHTTPRequest {
|
||||
return {
|
||||
method: request.method ?? 'GET',
|
||||
url: request.url ?? '/',
|
||||
headers: {
|
||||
...request.headers,
|
||||
'sec-websocket-key': request.headers['sec-websocket-key'] ?? '',
|
||||
} as SerializedHTTPRequest['headers'],
|
||||
socket: { remoteAddress: request.socket?.remoteAddress ?? '' },
|
||||
};
|
||||
}
|
||||
|
||||
handleConnection(client: WebSocket, request: IncomingMessage): any {
|
||||
this.hocuspocus.handleConnection(client, request);
|
||||
if (this.redisSync) {
|
||||
const serializedRequest = this.serializeRequest(request);
|
||||
const socketId = serializedRequest.headers['sec-websocket-key'];
|
||||
|
||||
// Create wrapper socket that only receives events via emit()
|
||||
// This prevents double-handling since Hocuspocus won't listen to raw WebSocket events
|
||||
const wrappedSocket = new WsSocketWrapper(client);
|
||||
|
||||
// Route through RedisSync extension (this calls handleConnection internally)
|
||||
this.redisSync.onSocketOpen(wrappedSocket as any, serializedRequest);
|
||||
|
||||
// Forward raw WebSocket messages to the extension
|
||||
client.on('message', (data: ArrayBuffer) => {
|
||||
this.redisSync!.onSocketMessage(
|
||||
wrappedSocket as any,
|
||||
serializedRequest,
|
||||
data,
|
||||
);
|
||||
});
|
||||
|
||||
// Forward close events
|
||||
client.on('close', (code: number, reason: Buffer) => {
|
||||
this.redisSync!.onSocketClose(socketId, code, reason);
|
||||
});
|
||||
|
||||
// Forward pong events for keepalive
|
||||
client.on('pong', (data: Buffer) => {
|
||||
wrappedSocket.emit('pong', data);
|
||||
});
|
||||
} else {
|
||||
// Fallback to direct Hocuspocus connection
|
||||
this.hocuspocus.handleConnection(client, request);
|
||||
}
|
||||
}
|
||||
|
||||
getConnectionCount() {
|
||||
|
||||
@@ -46,7 +46,7 @@ export class CollaborationModule implements OnModuleInit, OnModuleDestroy {
|
||||
});
|
||||
|
||||
wss.on('error', (error) =>
|
||||
this.logger.log('WebSocket server error:', error),
|
||||
this.logger.error('WebSocket server error:', error),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
import type RedisClient from 'ioredis';
|
||||
import { EventEmitter } from 'tseep';
|
||||
import type {
|
||||
Pack,
|
||||
RSAMessageClose,
|
||||
RSAMessagePing,
|
||||
RSAMessageSend,
|
||||
} from './redis-sync.extension';
|
||||
|
||||
export class CollabProxySocket extends EventEmitter {
|
||||
private replyTo: string;
|
||||
private pongChannel: string;
|
||||
private socketId: string;
|
||||
private pub: RedisClient;
|
||||
private pack: Pack;
|
||||
readyState = 1;
|
||||
|
||||
constructor(
|
||||
pub: RedisClient,
|
||||
pack: Pack,
|
||||
replyTo: string,
|
||||
pongChannel: string,
|
||||
socketId: string,
|
||||
) {
|
||||
super();
|
||||
this.replyTo = replyTo;
|
||||
this.pongChannel = pongChannel;
|
||||
this.socketId = socketId;
|
||||
this.pub = pub;
|
||||
this.pack = pack;
|
||||
this.once('close', () => {
|
||||
this.readyState = 3;
|
||||
});
|
||||
}
|
||||
|
||||
private publish(msg: RSAMessageClose | RSAMessagePing | RSAMessageSend) {
|
||||
this.pub.publish(this.replyTo, this.pack(msg));
|
||||
}
|
||||
|
||||
close(code?: number, reason?: string) {
|
||||
if (this.readyState !== 1) return;
|
||||
const msg: RSAMessageClose = {
|
||||
type: 'close',
|
||||
code,
|
||||
reason,
|
||||
socketId: this.socketId,
|
||||
};
|
||||
this.publish(msg);
|
||||
}
|
||||
|
||||
ping() {
|
||||
if (this.readyState !== 1) return;
|
||||
const msg: RSAMessagePing = {
|
||||
type: 'ping',
|
||||
socketId: this.socketId,
|
||||
respondTo: this.pongChannel,
|
||||
};
|
||||
this.publish(msg);
|
||||
}
|
||||
|
||||
send(message: Uint8Array) {
|
||||
if (this.readyState !== 1) return;
|
||||
const msg: RSAMessageSend = {
|
||||
type: 'send',
|
||||
socketId: this.socketId,
|
||||
message,
|
||||
};
|
||||
this.publish(msg);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
export * from './redis-sync.extension';
|
||||
export type { SerializedHTTPRequest } from './redis-sync.extension';
|
||||
@@ -0,0 +1,489 @@
|
||||
// Source https://github.com/ueberdosis/hocuspocus/pull/1008
|
||||
import type EventEmitter from 'node:events';
|
||||
import type { IncomingMessage } from 'node:http';
|
||||
import type { IncomingHttpHeaders } from 'node:http2';
|
||||
import {
|
||||
type Extension,
|
||||
type Hocuspocus,
|
||||
IncomingMessage as SocketIncomingMessage,
|
||||
type afterUnloadDocumentPayload,
|
||||
type onConfigurePayload,
|
||||
type onLoadDocumentPayload,
|
||||
} from '@hocuspocus/server';
|
||||
import type RedisClient from 'ioredis';
|
||||
import { readVarString } from 'lib0/decoding.js';
|
||||
import type { WebSocket } from 'ws';
|
||||
import { CollabProxySocket } from './collab-proxy-socket';
|
||||
|
||||
export type SecondParam<T> = T extends (
|
||||
arg1: unknown,
|
||||
arg2: infer A,
|
||||
...args: unknown[]
|
||||
) => unknown
|
||||
? A
|
||||
: never;
|
||||
export type RSAMessageProxy = {
|
||||
type: 'proxy';
|
||||
replyTo: string;
|
||||
// @ts-ignore
|
||||
message: Uint8Array<ArrayBufferLike>;
|
||||
serializedHTTPRequest: SerializedHTTPRequest;
|
||||
};
|
||||
|
||||
export type RSAMessageCloseProxy = {
|
||||
type: 'closeProxy';
|
||||
socketId: string;
|
||||
};
|
||||
|
||||
export type RSAMessageUnload = {
|
||||
type: 'unload';
|
||||
documentName: string;
|
||||
};
|
||||
|
||||
export type RSAMessageClose = {
|
||||
type: 'close';
|
||||
code?: number;
|
||||
reason?: string;
|
||||
socketId: string;
|
||||
};
|
||||
|
||||
export type RSAMessagePing = {
|
||||
type: 'ping';
|
||||
socketId: string;
|
||||
respondTo: string;
|
||||
};
|
||||
|
||||
export type RSAMessagePong = {
|
||||
type: 'pong';
|
||||
socketId: string;
|
||||
};
|
||||
|
||||
export type RSAMessageSend = {
|
||||
type: 'send';
|
||||
// @ts-ignore
|
||||
message: Uint8Array<ArrayBufferLike>;
|
||||
socketId: string;
|
||||
};
|
||||
|
||||
export type RSAMessageCustomEventStart<TName = string, TPayload = unknown> = {
|
||||
type: 'customEventStart';
|
||||
documentName: string;
|
||||
eventName: TName;
|
||||
payload: TPayload;
|
||||
replyTo: string;
|
||||
replyId: number;
|
||||
};
|
||||
|
||||
export type RSAMessageCustomEventComplete = {
|
||||
type: 'customEventComplete';
|
||||
replyId: number;
|
||||
payload: unknown;
|
||||
};
|
||||
|
||||
export type RSAMessage =
|
||||
| RSAMessageProxy
|
||||
| RSAMessageCloseProxy
|
||||
| RSAMessageUnload
|
||||
| RSAMessageClose
|
||||
| RSAMessagePing
|
||||
| RSAMessagePong
|
||||
| RSAMessageSend
|
||||
| RSAMessageCustomEventStart
|
||||
| RSAMessageCustomEventComplete;
|
||||
|
||||
export type SerializedHTTPRequest = {
|
||||
method: string;
|
||||
url: string;
|
||||
headers: IncomingHttpHeaders & { 'sec-websocket-key': string };
|
||||
socket: { remoteAddress: string };
|
||||
};
|
||||
// @ts-ignore
|
||||
export type Pack = (msg: RSAMessage) => string | Buffer<ArrayBufferLike>;
|
||||
type Unpack = (
|
||||
// @ts-ignore
|
||||
packedMessage: Uint8Array | Buffer<ArrayBufferLike>,
|
||||
) => RSAMessage;
|
||||
type ServerId = string;
|
||||
type DocumentName = string;
|
||||
type SocketId = string;
|
||||
type CustomEventName = string;
|
||||
type CustomEvents = Record<
|
||||
CustomEventName,
|
||||
(documentName: string, payload: unknown) => Promise<unknown>
|
||||
>;
|
||||
|
||||
interface Configuration<TCE> {
|
||||
redis: RedisClient;
|
||||
pack: Pack;
|
||||
unpack: Unpack;
|
||||
serverId: ServerId;
|
||||
lockTTL?: number;
|
||||
customEventTTL?: number;
|
||||
proxySocketTTL?: number;
|
||||
prefix?: string;
|
||||
customEvents?: TCE;
|
||||
}
|
||||
|
||||
interface BaseWebSocket extends EventEmitter {
|
||||
readyState: number;
|
||||
close(code?: number, reason?: string): void;
|
||||
ping(): void;
|
||||
send(message: Uint8Array): void;
|
||||
}
|
||||
|
||||
export class RedisSyncExtension<TCE extends CustomEvents> implements Extension {
|
||||
priority = 1000;
|
||||
private pub: RedisClient;
|
||||
private sub: RedisClient;
|
||||
private pack: Pack;
|
||||
private unpack: Unpack;
|
||||
private originSockets: Record<SocketId, BaseWebSocket> = {};
|
||||
private locks: Record<DocumentName, NodeJS.Timeout> = {};
|
||||
private lockPromises: Record<DocumentName, Promise<ServerId | null>> = {};
|
||||
private proxySockets: Record<
|
||||
SocketId,
|
||||
{ socket: CollabProxySocket; cleanup: NodeJS.Timeout }
|
||||
> = {};
|
||||
private prefix: string;
|
||||
private lockPrefix: string;
|
||||
private msgChannel: string;
|
||||
private serverId: ServerId;
|
||||
private customEventTTL: number;
|
||||
private lockTTL: number;
|
||||
private proxySocketTTL: number;
|
||||
private instance!: Hocuspocus;
|
||||
private customEvents: TCE;
|
||||
private replyIdCounter = 0;
|
||||
private pendingReplies: Record<
|
||||
number,
|
||||
// @ts-ignore
|
||||
PromiseWithResolvers<unknown>['resolve']
|
||||
> = {};
|
||||
constructor(configuration: Configuration<TCE>) {
|
||||
const {
|
||||
redis,
|
||||
pack,
|
||||
unpack,
|
||||
serverId,
|
||||
lockTTL,
|
||||
prefix,
|
||||
proxySocketTTL,
|
||||
customEvents,
|
||||
customEventTTL,
|
||||
} = configuration;
|
||||
this.pub = redis.duplicate();
|
||||
this.sub = redis.duplicate();
|
||||
this.pack = pack;
|
||||
this.unpack = unpack;
|
||||
this.serverId = serverId;
|
||||
this.lockTTL = lockTTL ?? 10_000;
|
||||
this.proxySocketTTL = proxySocketTTL ?? 30_000;
|
||||
this.customEventTTL = customEventTTL ?? 30_000;
|
||||
this.prefix = prefix ?? 'rsa';
|
||||
this.lockPrefix = `${this.prefix}Lock`;
|
||||
this.msgChannel = `${this.prefix}Msg`;
|
||||
this.customEvents = (customEvents ?? {}) as unknown as TCE;
|
||||
this.sub.subscribe(this.msgChannel, `${this.msgChannel}:${this.serverId}`);
|
||||
this.sub.on('messageBuffer', this.handleRedisMessage);
|
||||
}
|
||||
private getKey(documentName: string) {
|
||||
return `${this.lockPrefix}:${documentName}`;
|
||||
}
|
||||
|
||||
private closeProxy(socketId: string) {
|
||||
const socketRecord = this.proxySockets[socketId];
|
||||
if (!socketRecord) return;
|
||||
clearTimeout(socketRecord.cleanup);
|
||||
socketRecord.socket.emit('close', 1000, 'proxy_cleanup');
|
||||
delete this.proxySockets[socketId];
|
||||
}
|
||||
|
||||
private emitPong(socketId: string) {
|
||||
const socketRecord = this.proxySockets[socketId];
|
||||
if (socketRecord) {
|
||||
socketRecord.socket.emit('pong');
|
||||
}
|
||||
}
|
||||
|
||||
private handleProxyMessage(
|
||||
msg: Pick<RSAMessageProxy, 'replyTo' | 'message' | 'serializedHTTPRequest'>,
|
||||
) {
|
||||
const { replyTo, message, serializedHTTPRequest } = msg;
|
||||
const { headers } = serializedHTTPRequest;
|
||||
const socketId = headers['sec-websocket-key'];
|
||||
let socketRecord = this.proxySockets[socketId];
|
||||
const cleanup = setTimeout(() => {
|
||||
const record = this.proxySockets[socketId];
|
||||
if (record) {
|
||||
record.socket.emit('close', 1000, 'ttl_expired');
|
||||
delete this.proxySockets[socketId];
|
||||
}
|
||||
}, this.proxySocketTTL);
|
||||
if (!socketRecord) {
|
||||
const socket = new CollabProxySocket(
|
||||
this.pub,
|
||||
this.pack,
|
||||
replyTo,
|
||||
`${this.msgChannel}:${this.serverId}`,
|
||||
socketId,
|
||||
);
|
||||
socketRecord = { socket, cleanup };
|
||||
this.proxySockets[socketId] = socketRecord;
|
||||
this.instance.handleConnection(
|
||||
socket as unknown as WebSocket,
|
||||
serializedHTTPRequest as unknown as IncomingMessage,
|
||||
{},
|
||||
);
|
||||
} else {
|
||||
clearTimeout(socketRecord.cleanup);
|
||||
socketRecord.cleanup = cleanup;
|
||||
}
|
||||
socketRecord.socket.emit('message', message);
|
||||
}
|
||||
|
||||
private getOrClaimLock(documentName: string) {
|
||||
const lockPromise = this.pub.set(
|
||||
this.getKey(documentName),
|
||||
this.serverId,
|
||||
'PX',
|
||||
this.lockTTL,
|
||||
'NX',
|
||||
'GET',
|
||||
);
|
||||
this.lockPromises[documentName] = lockPromise;
|
||||
// Briefly cache the serverId that claimed the doc to reduce load on redis
|
||||
// When the claimant unloads the doc, it will send an unload message to immediately clear this
|
||||
// a lockTTL / 2 guarantees stale reads < lockTTL upon server crash
|
||||
setTimeout(() => {
|
||||
delete this.lockPromises[documentName];
|
||||
}, this.lockTTL / 2);
|
||||
return lockPromise;
|
||||
}
|
||||
|
||||
private getOrClaimLockThrottled(documentName: string) {
|
||||
const existingWorkerIdPromise = this.lockPromises[documentName];
|
||||
if (existingWorkerIdPromise) return existingWorkerIdPromise;
|
||||
return this.getOrClaimLock(documentName);
|
||||
}
|
||||
|
||||
private handleRedisMessage = async (
|
||||
_channel: Buffer,
|
||||
packedMessage: Buffer,
|
||||
) => {
|
||||
const msg = this.unpack(packedMessage) as RSAMessage;
|
||||
const { type } = msg;
|
||||
if (type === 'proxy') {
|
||||
this.handleProxyMessage(msg);
|
||||
return;
|
||||
}
|
||||
if (type === 'closeProxy') {
|
||||
this.closeProxy(msg.socketId);
|
||||
return;
|
||||
}
|
||||
if (type === 'unload') {
|
||||
delete this.lockPromises[msg.documentName];
|
||||
return;
|
||||
}
|
||||
if (type === 'customEventStart') {
|
||||
const { documentName, eventName, payload, replyTo, replyId } = msg;
|
||||
const res = await this.handleEventLocally(
|
||||
eventName as Extract<keyof TCE, string>,
|
||||
documentName,
|
||||
payload,
|
||||
);
|
||||
const reply: RSAMessageCustomEventComplete = {
|
||||
type: 'customEventComplete',
|
||||
replyId,
|
||||
payload: res,
|
||||
};
|
||||
this.pub.publish(`${replyTo}`, this.pack(reply)).catch(() => {});
|
||||
return;
|
||||
}
|
||||
if (type === 'customEventComplete') {
|
||||
const { replyId, payload } = msg;
|
||||
const resolveFn = this.pendingReplies[replyId];
|
||||
if (!resolveFn) return;
|
||||
delete this.pendingReplies[replyId];
|
||||
resolveFn(payload);
|
||||
return;
|
||||
}
|
||||
if (type === 'pong') {
|
||||
this.emitPong(msg.socketId);
|
||||
return;
|
||||
}
|
||||
const { socketId } = msg;
|
||||
const socket = this.originSockets[socketId];
|
||||
if (!socket) {
|
||||
// origin socket already cleaned up
|
||||
return;
|
||||
}
|
||||
if (type === 'close') {
|
||||
socket.close(msg.code, msg.reason);
|
||||
} else if (type === 'ping') {
|
||||
const { respondTo } = msg;
|
||||
const pong: RSAMessagePong = { type: 'pong', socketId };
|
||||
this.pub.publish(respondTo, this.pack(pong)).catch(() => {});
|
||||
} else if (type === 'send') {
|
||||
socket.send(msg.message);
|
||||
}
|
||||
};
|
||||
|
||||
async maintainLock(documentName: string) {
|
||||
this.locks[documentName] = setInterval(() => {
|
||||
this.pub.set(
|
||||
this.getKey(documentName),
|
||||
this.serverId,
|
||||
'PX',
|
||||
this.lockTTL,
|
||||
);
|
||||
}, this.lockTTL / 2);
|
||||
}
|
||||
|
||||
async releaseLock(documentName: string) {
|
||||
clearInterval(this.locks[documentName]);
|
||||
delete this.locks[documentName];
|
||||
return this.pub.del(this.getKey(documentName));
|
||||
}
|
||||
|
||||
private async handleEventLocally<TName extends Extract<keyof TCE, string>>(
|
||||
eventName: TName,
|
||||
documentName: string,
|
||||
payload: unknown,
|
||||
) {
|
||||
const handler = this.customEvents[eventName];
|
||||
if (!handler) throw new Error(`Invalid eventName: ${eventName}`);
|
||||
const result = await handler(documentName, payload);
|
||||
return result as Promise<ReturnType<TCE[TName]>>;
|
||||
}
|
||||
|
||||
async handleEvent<TName extends Extract<keyof TCE, string>>(
|
||||
eventName: TName,
|
||||
documentName: string,
|
||||
payload: unknown,
|
||||
) {
|
||||
const isDocLoadedOnInstance = this.instance.documents.has(documentName);
|
||||
|
||||
if (isDocLoadedOnInstance) {
|
||||
return this.handleEventLocally(eventName, documentName, payload);
|
||||
}
|
||||
|
||||
const proxyTo = await this.getOrClaimLockThrottled(documentName);
|
||||
if (proxyTo && proxyTo !== this.serverId) {
|
||||
++this.replyIdCounter; // bug in biome thinks this.replyIdCounter is not used if written on the line below
|
||||
const replyId = this.replyIdCounter;
|
||||
// another server owns the doc
|
||||
const proxyMessage: RSAMessageCustomEventStart = {
|
||||
eventName,
|
||||
documentName,
|
||||
payload,
|
||||
replyTo: `${this.msgChannel}:${this.serverId}`,
|
||||
replyId,
|
||||
type: 'customEventStart',
|
||||
};
|
||||
const msg = this.pack(proxyMessage);
|
||||
this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg).catch(() => {});
|
||||
// @ts-ignore
|
||||
const { promise, resolve, reject } = Promise.withResolvers();
|
||||
const timeoutId = setTimeout(() => {
|
||||
delete this.pendingReplies[replyId];
|
||||
reject('TIMEOUT');
|
||||
}, this.customEventTTL);
|
||||
this.pendingReplies[replyId] = (result: unknown) => {
|
||||
clearTimeout(timeoutId);
|
||||
resolve(result);
|
||||
};
|
||||
return promise as Promise<ReturnType<TCE[TName]>>;
|
||||
}
|
||||
// This server owns the document, but hocuspocus hasn't loaded it yet
|
||||
return this.handleEventLocally(eventName, documentName, payload);
|
||||
}
|
||||
|
||||
async lockDocument(documentName: string) {
|
||||
const proxyTo = await this.getOrClaimLockThrottled(documentName);
|
||||
if (proxyTo && proxyTo !== this.serverId) {
|
||||
throw new Error(`Could not lock document: ${documentName}`);
|
||||
}
|
||||
this.maintainLock(documentName);
|
||||
return () => this.releaseLock(documentName);
|
||||
}
|
||||
|
||||
/* WebSocket Server Hooks */
|
||||
onSocketOpen(
|
||||
ws: BaseWebSocket,
|
||||
serializedHTTPRequest: SerializedHTTPRequest,
|
||||
context = {},
|
||||
) {
|
||||
const socketId = serializedHTTPRequest.headers['sec-websocket-key'];
|
||||
this.originSockets[socketId] = ws;
|
||||
this.instance.handleConnection(
|
||||
ws as unknown as WebSocket,
|
||||
serializedHTTPRequest as unknown as IncomingMessage,
|
||||
context,
|
||||
);
|
||||
}
|
||||
|
||||
async onSocketMessage(
|
||||
ws: BaseWebSocket,
|
||||
serializedHTTPRequest: SerializedHTTPRequest,
|
||||
detachableMsg: ArrayBuffer,
|
||||
) {
|
||||
// @ts-ignore
|
||||
const message = new Uint8Array(detachableMsg.slice());
|
||||
const tmpMsg = new SocketIncomingMessage(detachableMsg);
|
||||
const documentName = readVarString(tmpMsg.decoder);
|
||||
const isDocLoadedOnInstance = this.instance.documents.has(documentName);
|
||||
|
||||
if (isDocLoadedOnInstance) {
|
||||
ws.emit('message', message);
|
||||
return;
|
||||
}
|
||||
|
||||
const proxyTo = await this.getOrClaimLockThrottled(documentName);
|
||||
if (proxyTo && proxyTo !== this.serverId) {
|
||||
// another server owns the doc
|
||||
const proxyMessage: RSAMessageProxy = {
|
||||
serializedHTTPRequest: serializedHTTPRequest,
|
||||
replyTo: `${this.msgChannel}:${this.serverId}`,
|
||||
message,
|
||||
type: 'proxy',
|
||||
};
|
||||
const msg = this.pack(proxyMessage);
|
||||
this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg).catch(() => {});
|
||||
return;
|
||||
}
|
||||
// This server owns the document, but hocuspocus hasn't loaded it yet
|
||||
ws.emit('message', message);
|
||||
}
|
||||
|
||||
onSocketClose(socketId: string, code?: number, reason?: ArrayBuffer) {
|
||||
const socket = this.originSockets[socketId];
|
||||
if (!socket) return;
|
||||
delete this.originSockets[socketId];
|
||||
socket.emit('close', code, reason);
|
||||
const msg: RSAMessageCloseProxy = { type: 'closeProxy', socketId };
|
||||
this.pub.publish(this.msgChannel, this.pack(msg)).catch(() => {});
|
||||
}
|
||||
|
||||
/* Hocuspocus hooks */
|
||||
async onConfigure({ instance }: onConfigurePayload) {
|
||||
this.instance = instance;
|
||||
}
|
||||
|
||||
async onLoadDocument(data: onLoadDocumentPayload) {
|
||||
const { documentName } = data;
|
||||
// Refresh the lock TTL
|
||||
this.maintainLock(documentName);
|
||||
}
|
||||
|
||||
async afterUnloadDocument(data: afterUnloadDocumentPayload) {
|
||||
const { documentName } = data;
|
||||
this.releaseLock(documentName);
|
||||
// Broadcast to cluster to immediately remove the cached redis value
|
||||
const msg: RSAMessageUnload = { type: 'unload', documentName };
|
||||
this.pub.publish(this.msgChannel, this.pack(msg)).catch(() => {});
|
||||
}
|
||||
async onDestroy() {
|
||||
this.pub.disconnect(false);
|
||||
this.sub.disconnect(false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
import { EventEmitter } from 'events';
|
||||
import type WebSocket from 'ws';
|
||||
|
||||
/**
|
||||
* Wrapper around ws WebSocket that only receives events via emit().
|
||||
* This prevents double-handling when used with RedisSyncExtension.
|
||||
*/
|
||||
export class WsSocketWrapper extends EventEmitter {
|
||||
private ws: WebSocket;
|
||||
readyState = 1;
|
||||
|
||||
constructor(ws: WebSocket) {
|
||||
super();
|
||||
this.ws = ws;
|
||||
this.once('close', () => {
|
||||
this.readyState = 3;
|
||||
});
|
||||
}
|
||||
|
||||
close(code?: number, reason?: string) {
|
||||
if (this.readyState !== 1) return;
|
||||
this.readyState = 3;
|
||||
try {
|
||||
this.ws.close(code, reason);
|
||||
} catch (e) {
|
||||
// Socket already closed
|
||||
}
|
||||
}
|
||||
|
||||
ping() {
|
||||
if (this.readyState !== 1) return;
|
||||
try {
|
||||
this.ws.ping();
|
||||
} catch (e) {
|
||||
// Socket already closed
|
||||
}
|
||||
}
|
||||
|
||||
send(message: Uint8Array) {
|
||||
if (this.readyState !== 1) return;
|
||||
try {
|
||||
this.ws.send(message);
|
||||
} catch (e) {
|
||||
// Socket already closed
|
||||
}
|
||||
}
|
||||
}
|
||||
Generated
+29
-24
@@ -545,12 +545,18 @@ importers:
|
||||
ldapts:
|
||||
specifier: ^7.4.0
|
||||
version: 7.4.0
|
||||
lib0:
|
||||
specifier: ^0.2.117
|
||||
version: 0.2.117
|
||||
mammoth:
|
||||
specifier: ^1.11.0
|
||||
version: 1.11.0
|
||||
mime-types:
|
||||
specifier: ^2.1.35
|
||||
version: 2.1.35
|
||||
msgpackr:
|
||||
specifier: ^1.11.8
|
||||
version: 1.11.8
|
||||
nanoid:
|
||||
specifier: 3.3.11
|
||||
version: 3.3.11
|
||||
@@ -614,6 +620,9 @@ importers:
|
||||
tmp-promise:
|
||||
specifier: ^3.0.3
|
||||
version: 3.0.3
|
||||
tseep:
|
||||
specifier: ^1.3.1
|
||||
version: 1.3.1
|
||||
typesense:
|
||||
specifier: ^2.1.0
|
||||
version: 2.1.0(@babel/runtime@7.25.6)
|
||||
@@ -7547,13 +7556,8 @@ packages:
|
||||
resolution: {integrity: sha512-+bT2uH4E5LGE7h/n3evcS/sQlJXCpIp6ym8OWJ5eV6+67Dsql/LaaT7qJBAt2rzfoa/5QBGBhxDix1dMt2kQKQ==}
|
||||
engines: {node: '>= 0.8.0'}
|
||||
|
||||
lib0@0.2.114:
|
||||
resolution: {integrity: sha512-gcxmNFzA4hv8UYi8j43uPlQ7CGcyMJ2KQb5kZASw6SnAKAf10hK12i2fjrS3Cl/ugZa5Ui6WwIu1/6MIXiHttQ==}
|
||||
engines: {node: '>=16'}
|
||||
hasBin: true
|
||||
|
||||
lib0@0.2.88:
|
||||
resolution: {integrity: sha512-KyroiEvCeZcZEMx5Ys+b4u4eEBbA1ch7XUaBhYpwa/nPMrzTjUhI4RfcytmQfYoTBPcdyx+FX6WFNIoNuJzJfQ==}
|
||||
lib0@0.2.117:
|
||||
resolution: {integrity: sha512-DeXj9X5xDCjgKLU/7RR+/HQEVzuuEUiwldwOGsHK/sfAfELGWEyTcf0x+uOvCvK3O2zPmZePXWL85vtia6GyZw==}
|
||||
engines: {node: '>=16'}
|
||||
hasBin: true
|
||||
|
||||
@@ -7929,8 +7933,8 @@ packages:
|
||||
resolution: {integrity: sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==}
|
||||
hasBin: true
|
||||
|
||||
msgpackr@1.11.2:
|
||||
resolution: {integrity: sha512-F9UngXRlPyWCDEASDpTf6c9uNhGPTqnTeLVt7bN+bU1eajoR/8V9ys2BRaV5C/e5ihE6sJ9uPIKaYt6bFuO32g==}
|
||||
msgpackr@1.11.8:
|
||||
resolution: {integrity: sha512-bC4UGzHhVvgDNS7kn9tV8fAucIYUBuGojcaLiz7v+P63Lmtm0Xeji8B/8tYKddALXxJLpwIeBmUN3u64C4YkRA==}
|
||||
|
||||
multimath@2.0.0:
|
||||
resolution: {integrity: sha512-toRx66cAMJ+Ccz7pMIg38xSIrtnbozk0dchXezwQDMgQmbGpfxjtv68H+L00iFL8hxDaVjrmwAFSb3I6bg8Q2g==}
|
||||
@@ -9628,6 +9632,9 @@ packages:
|
||||
resolution: {integrity: sha512-NoZ4roiN7LnbKn9QqE1amc9DJfzvZXxF4xDavcOWt1BPkdx+m+0gJuPM+S0vCe7zTJMYUP0R8pO2XMr+Y8oLIg==}
|
||||
engines: {node: '>=6'}
|
||||
|
||||
tseep@1.3.1:
|
||||
resolution: {integrity: sha512-ZPtfk1tQnZVyr7BPtbJ93qaAh2lZuIOpTMjhrYa4XctT8xe7t4SAW9LIxrySDuYMsfNNayE51E/WNGrNVgVicQ==}
|
||||
|
||||
tslib@2.8.0:
|
||||
resolution: {integrity: sha512-jWVzBLplnCmoaTr13V9dYbiQ99wvZRd0vNWaDRg+aVYRcjDF3nDksxFDE/+fkXnKhpnUUkmx5pK/v8mCtLVqZA==}
|
||||
|
||||
@@ -12616,7 +12623,7 @@ snapshots:
|
||||
|
||||
'@hocuspocus/common@3.4.3':
|
||||
dependencies:
|
||||
lib0: 0.2.114
|
||||
lib0: 0.2.117
|
||||
|
||||
'@hocuspocus/extension-redis@3.4.3(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29)':
|
||||
dependencies:
|
||||
@@ -12636,7 +12643,7 @@ snapshots:
|
||||
dependencies:
|
||||
'@hocuspocus/common': 3.4.3
|
||||
'@lifeomic/attempt': 3.0.3
|
||||
lib0: 0.2.114
|
||||
lib0: 0.2.117
|
||||
ws: 8.19.0
|
||||
y-protocols: 1.0.6(yjs@13.6.29)
|
||||
yjs: 13.6.29
|
||||
@@ -12650,7 +12657,7 @@ snapshots:
|
||||
async-lock: 1.4.1
|
||||
async-mutex: 0.5.0
|
||||
kleur: 4.1.5
|
||||
lib0: 0.2.114
|
||||
lib0: 0.2.117
|
||||
ws: 8.19.0
|
||||
y-protocols: 1.0.6(yjs@13.6.29)
|
||||
yjs: 13.6.29
|
||||
@@ -14824,7 +14831,7 @@ snapshots:
|
||||
|
||||
'@tiptap/y-tiptap@3.0.1(prosemirror-model@1.25.1)(prosemirror-state@1.4.3)(prosemirror-view@1.40.0)(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29)':
|
||||
dependencies:
|
||||
lib0: 0.2.114
|
||||
lib0: 0.2.117
|
||||
prosemirror-model: 1.25.1
|
||||
prosemirror-state: 1.4.3
|
||||
prosemirror-view: 1.40.0
|
||||
@@ -16026,7 +16033,7 @@ snapshots:
|
||||
dependencies:
|
||||
cron-parser: 4.9.0
|
||||
ioredis: 5.8.2
|
||||
msgpackr: 1.11.2
|
||||
msgpackr: 1.11.8
|
||||
node-abort-controller: 3.1.1
|
||||
semver: 7.7.2
|
||||
tslib: 2.8.1
|
||||
@@ -18630,11 +18637,7 @@ snapshots:
|
||||
prelude-ls: 1.2.1
|
||||
type-check: 0.4.0
|
||||
|
||||
lib0@0.2.114:
|
||||
dependencies:
|
||||
isomorphic.js: 0.2.5
|
||||
|
||||
lib0@0.2.88:
|
||||
lib0@0.2.117:
|
||||
dependencies:
|
||||
isomorphic.js: 0.2.5
|
||||
|
||||
@@ -19109,7 +19112,7 @@ snapshots:
|
||||
'@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.2
|
||||
optional: true
|
||||
|
||||
msgpackr@1.11.2:
|
||||
msgpackr@1.11.8:
|
||||
optionalDependencies:
|
||||
msgpackr-extract: 3.0.2
|
||||
|
||||
@@ -20963,6 +20966,8 @@ snapshots:
|
||||
minimist: 1.2.8
|
||||
strip-bom: 3.0.0
|
||||
|
||||
tseep@1.3.1: {}
|
||||
|
||||
tslib@2.8.0: {}
|
||||
|
||||
tslib@2.8.1: {}
|
||||
@@ -21435,12 +21440,12 @@ snapshots:
|
||||
|
||||
y-indexeddb@9.0.12(yjs@13.6.29):
|
||||
dependencies:
|
||||
lib0: 0.2.88
|
||||
lib0: 0.2.117
|
||||
yjs: 13.6.29
|
||||
|
||||
y-prosemirror@1.3.7(prosemirror-model@1.25.1)(prosemirror-state@1.4.3)(prosemirror-view@1.40.0)(y-protocols@1.0.6(yjs@13.6.29))(yjs@13.6.29):
|
||||
dependencies:
|
||||
lib0: 0.2.114
|
||||
lib0: 0.2.117
|
||||
prosemirror-model: 1.25.1
|
||||
prosemirror-state: 1.4.3
|
||||
prosemirror-view: 1.40.0
|
||||
@@ -21449,7 +21454,7 @@ snapshots:
|
||||
|
||||
y-protocols@1.0.6(yjs@13.6.29):
|
||||
dependencies:
|
||||
lib0: 0.2.114
|
||||
lib0: 0.2.117
|
||||
yjs: 13.6.29
|
||||
|
||||
y18n@4.0.3: {}
|
||||
@@ -21502,7 +21507,7 @@ snapshots:
|
||||
|
||||
yjs@13.6.29:
|
||||
dependencies:
|
||||
lib0: 0.2.114
|
||||
lib0: 0.2.117
|
||||
|
||||
yn@3.1.1: {}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user