From 5f0afd6f9fbab21254cd9976ce693e019c4e9b61 Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Mon, 9 Feb 2026 09:04:18 -0800 Subject: [PATCH] Refactor: use queue for page history --- .../src/collaboration/collaboration.module.ts | 4 +- apps/server/src/collaboration/constants.ts | 3 + .../extensions/persistence.extension.ts | 37 +++++++--- .../listeners/history.listener.ts | 52 -------------- .../processors/history.processor.ts | 70 +++++++++++++++++++ .../queue/constants/queue.constants.ts | 3 + .../queue/constants/queue.interface.ts | 4 ++ .../src/integrations/queue/queue.module.ts | 8 +++ 8 files changed, 116 insertions(+), 65 deletions(-) create mode 100644 apps/server/src/collaboration/constants.ts delete mode 100644 apps/server/src/collaboration/listeners/history.listener.ts create mode 100644 apps/server/src/collaboration/processors/history.processor.ts diff --git a/apps/server/src/collaboration/collaboration.module.ts b/apps/server/src/collaboration/collaboration.module.ts index e9374c53..f11473ac 100644 --- a/apps/server/src/collaboration/collaboration.module.ts +++ b/apps/server/src/collaboration/collaboration.module.ts @@ -7,7 +7,7 @@ import { CollabWsAdapter } from './adapter/collab-ws.adapter'; import { IncomingMessage } from 'http'; import { WebSocket } from 'ws'; import { TokenModule } from '../core/auth/token.module'; -import { HistoryListener } from './listeners/history.listener'; +import { HistoryProcessor } from './processors/history.processor'; import { LoggerExtension } from './extensions/logger.extension'; import { CollaborationHandler } from './collaboration.handler'; @@ -17,7 +17,7 @@ import { CollaborationHandler } from './collaboration.handler'; AuthenticationExtension, PersistenceExtension, LoggerExtension, - HistoryListener, + HistoryProcessor, CollaborationHandler, ], exports: [CollaborationGateway], diff --git a/apps/server/src/collaboration/constants.ts b/apps/server/src/collaboration/constants.ts new file mode 100644 index 00000000..0e49f32c --- /dev/null +++ b/apps/server/src/collaboration/constants.ts @@ -0,0 +1,3 @@ +export const HISTORY_INTERVAL = 4 * 60 * 1000; +export const HISTORY_FAST_INTERVAL = 60 * 1000; +export const HISTORY_FAST_THRESHOLD = 5 * 60 * 1000; diff --git a/apps/server/src/collaboration/extensions/persistence.extension.ts b/apps/server/src/collaboration/extensions/persistence.extension.ts index 54c4a89e..72aa32de 100644 --- a/apps/server/src/collaboration/extensions/persistence.extension.ts +++ b/apps/server/src/collaboration/extensions/persistence.extension.ts @@ -13,7 +13,6 @@ import { PageRepo } from '@docmost/db/repos/page/page.repo'; import { InjectKysely } from 'nestjs-kysely'; import { KyselyDB } from '@docmost/db/types/kysely.types'; import { executeTx } from '@docmost/db/utils'; -import { EventEmitter2 } from '@nestjs/event-emitter'; import { InjectQueue } from '@nestjs/bullmq'; import { QueueJob, QueueName } from '../../integrations/queue/constants'; import { Queue } from 'bullmq'; @@ -22,8 +21,16 @@ import { extractPageMentions, } from '../../common/helpers/prosemirror/utils'; import { isDeepStrictEqual } from 'node:util'; -import { IPageBacklinkJob } from '../../integrations/queue/constants/queue.interface'; +import { + IPageBacklinkJob, + IPageHistoryJob, +} from '../../integrations/queue/constants/queue.interface'; import { Page } from '@docmost/db/types/entity.types'; +import { + HISTORY_FAST_INTERVAL, + HISTORY_FAST_THRESHOLD, + HISTORY_INTERVAL, +} from '../constants'; @Injectable() export class PersistenceExtension implements Extension { @@ -33,9 +40,9 @@ export class PersistenceExtension implements Extension { constructor( private readonly pageRepo: PageRepo, @InjectKysely() private readonly db: KyselyDB, - private eventEmitter: EventEmitter2, @InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue, @InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue, + @InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue, ) {} async onLoadDocument(data: onLoadDocumentPayload) { @@ -153,14 +160,6 @@ export class PersistenceExtension implements Extension { } if (page) { - this.eventEmitter.emit('collab.page.updated', { - page: { - ...page, - content: tiptapJson, - lastUpdatedById: context.user.id, - }, - }); - const mentions = extractMentions(tiptapJson); const pageMentions = extractPageMentions(mentions); @@ -174,6 +173,8 @@ export class PersistenceExtension implements Extension { pageIds: [pageId], workspaceId: page.workspaceId, }); + + await this.enqueuePageHistory(page); } } @@ -193,4 +194,18 @@ export class PersistenceExtension implements Extension { const documentName = data.documentName; this.contributors.delete(documentName); } + + private async enqueuePageHistory(page: Page): Promise { + const pageAge = Date.now() - new Date(page.createdAt).getTime(); + const delay = + pageAge < HISTORY_FAST_THRESHOLD + ? HISTORY_FAST_INTERVAL + : HISTORY_INTERVAL; + + await this.historyQueue.add( + QueueJob.PAGE_HISTORY, + { pageId: page.id } as IPageHistoryJob, + { jobId: page.id, delay }, + ); + } } diff --git a/apps/server/src/collaboration/listeners/history.listener.ts b/apps/server/src/collaboration/listeners/history.listener.ts deleted file mode 100644 index e0d40838..00000000 --- a/apps/server/src/collaboration/listeners/history.listener.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { OnEvent } from '@nestjs/event-emitter'; -import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo'; -import { Page } from '@docmost/db/types/entity.types'; -import { isDeepStrictEqual } from 'node:util'; -import { EnvironmentService } from '../../integrations/environment/environment.service'; - -export class UpdatedPageEvent { - page: Page; -} - -@Injectable() -export class HistoryListener { - private readonly logger = new Logger(HistoryListener.name); - - constructor( - private readonly pageHistoryRepo: PageHistoryRepo, - private readonly environmentService: EnvironmentService, - ) {} - - @OnEvent('collab.page.updated') - async handleCreatePageHistory(event: UpdatedPageEvent) { - const { page } = event; - - const pageCreationTime = new Date(page.createdAt).getTime(); - const currentTime = Date.now(); - const FIVE_MINUTES = this.environmentService.isDevelopment() - ? 60 * 1000 - : 5 * 60 * 1000; - - if (currentTime - pageCreationTime < FIVE_MINUTES) { - return; - } - - const lastHistory = await this.pageHistoryRepo.findPageLastHistory(page.id, { - includeContent: true, - }); - - if ( - !lastHistory || - (!isDeepStrictEqual(lastHistory.content, page.content) && - currentTime - new Date(lastHistory.createdAt).getTime() >= FIVE_MINUTES) - ) { - try { - await this.pageHistoryRepo.saveHistory(page); - this.logger.debug(`New history created for: ${page.id}`); - } catch (err) { - this.logger.error(`Failed to create history for page: ${page.id}`, err); - } - } - } -} diff --git a/apps/server/src/collaboration/processors/history.processor.ts b/apps/server/src/collaboration/processors/history.processor.ts new file mode 100644 index 00000000..6b28e127 --- /dev/null +++ b/apps/server/src/collaboration/processors/history.processor.ts @@ -0,0 +1,70 @@ +import { Logger, OnModuleDestroy } from '@nestjs/common'; +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { Job } from 'bullmq'; +import { QueueJob, QueueName } from '../../integrations/queue/constants'; +import { IPageHistoryJob } from '../../integrations/queue/constants/queue.interface'; +import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo'; +import { PageRepo } from '@docmost/db/repos/page/page.repo'; +import { isDeepStrictEqual } from 'node:util'; + +@Processor(QueueName.HISTORY_QUEUE) +export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { + private readonly logger = new Logger(HistoryProcessor.name); + + constructor( + private readonly pageHistoryRepo: PageHistoryRepo, + private readonly pageRepo: PageRepo, + ) { + super(); + } + + async process(job: Job): Promise { + if (job.name !== QueueJob.PAGE_HISTORY) return; + + try { + const { pageId } = job.data; + + const page = await this.pageRepo.findById(pageId, { + includeContent: true, + }); + + if (!page) { + this.logger.warn(`Page ${pageId} not found, skipping history`); + return; + } + + const lastHistory = await this.pageHistoryRepo.findPageLastHistory( + pageId, + { includeContent: true }, + ); + + if ( + !lastHistory || + !isDeepStrictEqual(lastHistory.content, page.content) + ) { + await this.pageHistoryRepo.saveHistory(page); + this.logger.debug(`History created for page: ${pageId}`); + } + } catch (err) { + throw err; + } + } + + @OnWorkerEvent('active') + onActive(job: Job) { + this.logger.debug(`Processing ${job.name} for page: ${job.data.pageId}`); + } + + @OnWorkerEvent('failed') + onError(job: Job) { + this.logger.error( + `Failed ${job.name} for page: ${job.data.pageId}. Reason: ${job.failedReason}`, + ); + } + + async onModuleDestroy(): Promise { + if (this.worker) { + await this.worker.close(); + } + } +} diff --git a/apps/server/src/integrations/queue/constants/queue.constants.ts b/apps/server/src/integrations/queue/constants/queue.constants.ts index 5c7aa29a..c194a28c 100644 --- a/apps/server/src/integrations/queue/constants/queue.constants.ts +++ b/apps/server/src/integrations/queue/constants/queue.constants.ts @@ -6,6 +6,7 @@ export enum QueueName { FILE_TASK_QUEUE = '{file-task-queue}', SEARCH_QUEUE = '{search-queue}', AI_QUEUE = '{ai-queue}', + HISTORY_QUEUE = '{history-queue}', } export enum QueueJob { @@ -58,4 +59,6 @@ export enum QueueJob { GENERATE_PAGE_EMBEDDINGS = 'generate-page-embeddings', DELETE_PAGE_EMBEDDINGS = 'delete-page-embeddings', + + PAGE_HISTORY = 'page-history', } diff --git a/apps/server/src/integrations/queue/constants/queue.interface.ts b/apps/server/src/integrations/queue/constants/queue.interface.ts index ce105f1c..cfcd7148 100644 --- a/apps/server/src/integrations/queue/constants/queue.interface.ts +++ b/apps/server/src/integrations/queue/constants/queue.interface.ts @@ -9,4 +9,8 @@ export interface IPageBacklinkJob { export interface IStripeSeatsSyncJob { workspaceId: string; +} + +export interface IPageHistoryJob { + pageId: string; } \ No newline at end of file diff --git a/apps/server/src/integrations/queue/queue.module.ts b/apps/server/src/integrations/queue/queue.module.ts index 6787e010..54037bce 100644 --- a/apps/server/src/integrations/queue/queue.module.ts +++ b/apps/server/src/integrations/queue/queue.module.ts @@ -73,6 +73,14 @@ import { BacklinksProcessor } from './processors/backlinks.processor'; attempts: 1, }, }), + BullModule.registerQueue({ + name: QueueName.HISTORY_QUEUE, + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: true, + attempts: 2, + }, + }), ], exports: [BullModule], providers: [BacklinksProcessor],