mirror of
https://github.com/docmost/docmost.git
synced 2026-05-07 06:23:06 +08:00
move backlinks queue-up to history processor
This commit is contained in:
@@ -18,12 +18,10 @@ import { QueueJob, QueueName } from '../../integrations/queue/constants';
|
|||||||
import { Queue } from 'bullmq';
|
import { Queue } from 'bullmq';
|
||||||
import {
|
import {
|
||||||
extractMentions,
|
extractMentions,
|
||||||
extractPageMentions,
|
|
||||||
extractUserMentions,
|
extractUserMentions,
|
||||||
} from '../../common/helpers/prosemirror/utils';
|
} from '../../common/helpers/prosemirror/utils';
|
||||||
import { isDeepStrictEqual } from 'node:util';
|
import { isDeepStrictEqual } from 'node:util';
|
||||||
import {
|
import {
|
||||||
IPageBacklinkJob,
|
|
||||||
IPageHistoryJob,
|
IPageHistoryJob,
|
||||||
IPageMentionNotificationJob,
|
IPageMentionNotificationJob,
|
||||||
} from '../../integrations/queue/constants/queue.interface';
|
} from '../../integrations/queue/constants/queue.interface';
|
||||||
@@ -43,7 +41,6 @@ export class PersistenceExtension implements Extension {
|
|||||||
constructor(
|
constructor(
|
||||||
private readonly pageRepo: PageRepo,
|
private readonly pageRepo: PageRepo,
|
||||||
@InjectKysely() private readonly db: KyselyDB,
|
@InjectKysely() private readonly db: KyselyDB,
|
||||||
@InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue,
|
|
||||||
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
|
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
|
||||||
@InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue,
|
@InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue,
|
||||||
@InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue,
|
@InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue,
|
||||||
@@ -165,13 +162,6 @@ export class PersistenceExtension implements Extension {
|
|||||||
await this.collabHistory.addContributors(pageId, editingUserIds);
|
await this.collabHistory.addContributors(pageId, editingUserIds);
|
||||||
|
|
||||||
const mentions = extractMentions(tiptapJson);
|
const mentions = extractMentions(tiptapJson);
|
||||||
const pageMentions = extractPageMentions(mentions);
|
|
||||||
|
|
||||||
await this.generalQueue.add(QueueJob.PAGE_BACKLINKS, {
|
|
||||||
pageId: pageId,
|
|
||||||
workspaceId: page.workspaceId,
|
|
||||||
mentions: pageMentions,
|
|
||||||
} as IPageBacklinkJob);
|
|
||||||
|
|
||||||
const userMentions = extractUserMentions(mentions);
|
const userMentions = extractUserMentions(mentions);
|
||||||
const oldMentions = page.content ? extractMentions(page.content) : [];
|
const oldMentions = page.content ? extractMentions(page.content) : [];
|
||||||
|
|||||||
@@ -4,9 +4,14 @@ import { InjectQueue } from '@nestjs/bullmq';
|
|||||||
import { Job, Queue } from 'bullmq';
|
import { Job, Queue } from 'bullmq';
|
||||||
import { QueueJob, QueueName } from '../../integrations/queue/constants';
|
import { QueueJob, QueueName } from '../../integrations/queue/constants';
|
||||||
import {
|
import {
|
||||||
|
IPageBacklinkJob,
|
||||||
IPageHistoryJob,
|
IPageHistoryJob,
|
||||||
IPageUpdateNotificationJob,
|
IPageUpdateNotificationJob,
|
||||||
} from '../../integrations/queue/constants/queue.interface';
|
} from '../../integrations/queue/constants/queue.interface';
|
||||||
|
import {
|
||||||
|
extractMentions,
|
||||||
|
extractPageMentions,
|
||||||
|
} from '../../common/helpers/prosemirror/utils';
|
||||||
import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo';
|
import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo';
|
||||||
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||||
import { isDeepStrictEqual } from 'node:util';
|
import { isDeepStrictEqual } from 'node:util';
|
||||||
@@ -23,6 +28,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
|||||||
private readonly collabHistory: CollabHistoryService,
|
private readonly collabHistory: CollabHistoryService,
|
||||||
private readonly watcherService: WatcherService,
|
private readonly watcherService: WatcherService,
|
||||||
@InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue,
|
@InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue,
|
||||||
|
@InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue,
|
||||||
) {
|
) {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
@@ -69,6 +75,21 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
|||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const mentions = extractMentions(page.content);
|
||||||
|
const pageMentions = extractPageMentions(mentions);
|
||||||
|
|
||||||
|
await this.generalQueue
|
||||||
|
.add(QueueJob.PAGE_BACKLINKS, {
|
||||||
|
pageId,
|
||||||
|
workspaceId: page.workspaceId,
|
||||||
|
mentions: pageMentions,
|
||||||
|
} as IPageBacklinkJob)
|
||||||
|
.catch((err) => {
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to queue backlinks for ${pageId}: ${err.message}`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
if (contributorIds.length > 0 && lastHistory?.content) {
|
if (contributorIds.length > 0 && lastHistory?.content) {
|
||||||
await this.notificationQueue
|
await this.notificationQueue
|
||||||
.add(QueueJob.PAGE_UPDATED, {
|
.add(QueueJob.PAGE_UPDATED, {
|
||||||
|
|||||||
Reference in New Issue
Block a user