diff --git a/apps/server/src/collaboration/extensions/persistence.extension.ts b/apps/server/src/collaboration/extensions/persistence.extension.ts index 642d0761..d32e4778 100644 --- a/apps/server/src/collaboration/extensions/persistence.extension.ts +++ b/apps/server/src/collaboration/extensions/persistence.extension.ts @@ -18,12 +18,10 @@ import { QueueJob, QueueName } from '../../integrations/queue/constants'; import { Queue } from 'bullmq'; import { extractMentions, - extractPageMentions, extractUserMentions, } from '../../common/helpers/prosemirror/utils'; import { isDeepStrictEqual } from 'node:util'; import { - IPageBacklinkJob, IPageHistoryJob, IPageMentionNotificationJob, } from '../../integrations/queue/constants/queue.interface'; @@ -43,7 +41,6 @@ export class PersistenceExtension implements Extension { constructor( private readonly pageRepo: PageRepo, @InjectKysely() private readonly db: KyselyDB, - @InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue, @InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue, @InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue, @InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue, @@ -165,13 +162,6 @@ export class PersistenceExtension implements Extension { await this.collabHistory.addContributors(pageId, editingUserIds); const mentions = extractMentions(tiptapJson); - const pageMentions = extractPageMentions(mentions); - - await this.generalQueue.add(QueueJob.PAGE_BACKLINKS, { - pageId: pageId, - workspaceId: page.workspaceId, - mentions: pageMentions, - } as IPageBacklinkJob); const userMentions = extractUserMentions(mentions); const oldMentions = page.content ? extractMentions(page.content) : []; diff --git a/apps/server/src/collaboration/processors/history.processor.ts b/apps/server/src/collaboration/processors/history.processor.ts index daec4eaf..d7e27f60 100644 --- a/apps/server/src/collaboration/processors/history.processor.ts +++ b/apps/server/src/collaboration/processors/history.processor.ts @@ -4,9 +4,14 @@ import { InjectQueue } from '@nestjs/bullmq'; import { Job, Queue } from 'bullmq'; import { QueueJob, QueueName } from '../../integrations/queue/constants'; import { + IPageBacklinkJob, IPageHistoryJob, IPageUpdateNotificationJob, } from '../../integrations/queue/constants/queue.interface'; +import { + extractMentions, + extractPageMentions, +} from '../../common/helpers/prosemirror/utils'; import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo'; import { PageRepo } from '@docmost/db/repos/page/page.repo'; import { isDeepStrictEqual } from 'node:util'; @@ -23,6 +28,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { private readonly collabHistory: CollabHistoryService, private readonly watcherService: WatcherService, @InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue, + @InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue, ) { super(); } @@ -69,6 +75,21 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { throw err; } + const mentions = extractMentions(page.content); + const pageMentions = extractPageMentions(mentions); + + await this.generalQueue + .add(QueueJob.PAGE_BACKLINKS, { + pageId, + workspaceId: page.workspaceId, + mentions: pageMentions, + } as IPageBacklinkJob) + .catch((err) => { + this.logger.error( + `Failed to queue backlinks for ${pageId}: ${err.message}`, + ); + }); + if (contributorIds.length > 0 && lastHistory?.content) { await this.notificationQueue .add(QueueJob.PAGE_UPDATED, {