fix: graceful collab shutdown

This commit is contained in:
Philipinho
2026-01-25 18:59:14 +00:00
parent e755207c3b
commit 81cceb483a
5 changed files with 47 additions and 20 deletions
@@ -30,14 +30,22 @@ export class CollabWsAdapter {
return this.wss;
}
public destroy() {
public close() {
try {
this.wss.clients.forEach((client) => {
client.terminate();
});
this.wss.close();
} catch (err) {
console.error(err);
}
}
public destroy() {
try {
this.wss.close();
this.wss.clients.forEach((client) => {
client.terminate();
});
} catch (err) {
console.error(err);
}
}
}
@@ -21,10 +21,10 @@ import { pack, unpack } from 'msgpackr';
@Injectable()
export class CollaborationGateway {
private hocuspocus: Hocuspocus;
private readonly hocuspocus: Hocuspocus;
private redisConfig: RedisConfig;
private redisSync: RedisSyncExtension<{}> | null = null;
private useRedisSync: boolean;
private readonly redisSync: RedisSyncExtension<{}> | null = null;
private readonly useRedisSync: boolean;
constructor(
private authenticationExtension: AuthenticationExtension,
@@ -122,6 +122,24 @@ export class CollaborationGateway {
}
async destroy(): Promise<void> {
//await this.hocuspocus.destroy();
await new Promise((r) => setTimeout(r, 10000));
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
try {
// Wait for all documents to unload
this.hocuspocus.configuration.extensions.push({
async afterUnloadDocument({ instance }) {
if (instance.getDocumentsCount() === 0) resolve('');
},
});
if (this.hocuspocus.getDocumentsCount() === 0) resolve('');
this.hocuspocus.closeConnections();
} catch (error) {
console.error(error);
}
});
await this.hocuspocus.hooks('onDestroy', { instance: this.hocuspocus });
}
}
@@ -51,11 +51,8 @@ export class CollaborationModule implements OnModuleInit, OnModuleDestroy {
}
async onModuleDestroy(): Promise<void> {
if (this.collaborationGateway) {
await this.collaborationGateway.destroy();
}
if (this.collabWsAdapter) {
this.collabWsAdapter.destroy();
}
this.collabWsAdapter?.close();
await this.collaborationGateway?.destroy();
this.collabWsAdapter?.destroy();
}
}
@@ -9,11 +9,11 @@ import { Injectable, Logger } from '@nestjs/common';
export class LoggerExtension implements Extension {
private readonly logger = new Logger('Collab' + LoggerExtension.name);
async onDisconnect(data: onDisconnectPayload) {
this.logger.debug(`User disconnected from "${data.documentName}".`);
}
async afterUnloadDocument(data: onLoadDocumentPayload) {
this.logger.debug('Unloaded ' + data.documentName + ' from memory');
}
async onDisconnect(data: onDisconnectPayload) {
this.logger.debug('User disconnected from ' + data.documentName);
}
}
@@ -275,7 +275,9 @@ export class RedisSyncExtension<TCE extends CustomEvents> implements Extension {
const proxyTo = await this.getOrClaimLockThrottled(documentName);
if (proxyTo && proxyTo !== this.serverId) {
this.logger.debug(`Doc "${documentName}" owned by server ${proxyTo}, forwarding event "${eventName}"`);
this.logger.debug(
`Doc "${documentName}" owned by server ${proxyTo}, forwarding event "${eventName}"`,
);
++this.replyIdCounter; // bug in biome thinks this.replyIdCounter is not used if written on the line below
const replyId = this.replyIdCounter;
// another server owns the doc
@@ -347,7 +349,9 @@ export class RedisSyncExtension<TCE extends CustomEvents> implements Extension {
const proxyTo = await this.getOrClaimLockThrottled(documentName);
if (proxyTo && proxyTo !== this.serverId) {
this.logger.debug(`Doc "${documentName}" owned by server ${proxyTo}, proxying message`);
this.logger.debug(
`Doc "${documentName}" owned by server ${proxyTo}, proxying message`,
);
// another server owns the doc
const proxyMessage: RSAMessageProxy = {
serializedHTTPRequest: serializedHTTPRequest,