feat: notifications (#1947)

* feat: notifications
* feat: watchers

* improvements

* handle page move for watchers

* make watchers non-blocking

* more
This commit is contained in:
Philip Okugbe
2026-02-14 20:00:38 -08:00
committed by GitHub
parent e0ab9d9b5e
commit 05b3c65b0f
80 changed files with 3071 additions and 238 deletions
@@ -1,135 +0,0 @@
import { Logger, OnModuleDestroy } from '@nestjs/common';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { QueueJob, QueueName } from '../constants';
import { IPageBacklinkJob } from '../constants/queue.interface';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { BacklinkRepo } from '@docmost/db/repos/backlink/backlink.repo';
import { executeTx } from '@docmost/db/utils';
@Processor(QueueName.GENERAL_QUEUE)
export class BacklinksProcessor extends WorkerHost implements OnModuleDestroy {
private readonly logger = new Logger(BacklinksProcessor.name);
constructor(
@InjectKysely() private readonly db: KyselyDB,
private readonly backlinkRepo: BacklinkRepo,
) {
super();
}
async process(job: Job<IPageBacklinkJob, void>): Promise<void> {
try {
const { pageId, mentions, workspaceId } = job.data;
switch (job.name) {
case QueueJob.PAGE_BACKLINKS:
{
await executeTx(this.db, async (trx) => {
const existingBacklinks = await trx
.selectFrom('backlinks')
.select('targetPageId')
.where('sourcePageId', '=', pageId)
.execute();
if (existingBacklinks.length === 0 && mentions.length === 0) {
return;
}
const existingTargetPageIds = existingBacklinks.map(
(backlink) => backlink.targetPageId,
);
const targetPageIds = mentions
.filter((mention) => mention.entityId !== pageId)
.map((mention) => mention.entityId);
// make sure target pages belong to the same workspace
let validTargetPages = [];
if (targetPageIds.length > 0) {
validTargetPages = await trx
.selectFrom('pages')
.select('id')
.where('id', 'in', targetPageIds)
.where('workspaceId', '=', workspaceId)
.execute();
}
const validTargetPageIds = validTargetPages.map(
(page) => page.id,
);
// new backlinks
const backlinksToAdd = validTargetPageIds.filter(
(id) => !existingTargetPageIds.includes(id),
);
// stale backlinks
const backlinksToRemove = existingTargetPageIds.filter(
(existingId) => !validTargetPageIds.includes(existingId),
);
// add new backlinks
if (backlinksToAdd.length > 0) {
const newBacklinks = backlinksToAdd.map((targetPageId) => ({
sourcePageId: pageId,
targetPageId: targetPageId,
workspaceId: workspaceId,
}));
await this.backlinkRepo.insertBacklink(newBacklinks, trx);
this.logger.debug(
`Added ${newBacklinks.length} new backlinks to ${pageId}`,
);
}
// remove stale backlinks
if (backlinksToRemove.length > 0) {
await this.db
.deleteFrom('backlinks')
.where('sourcePageId', '=', pageId)
.where('targetPageId', 'in', backlinksToRemove)
.execute();
this.logger.debug(
`Removed ${backlinksToRemove.length} outdated backlinks from ${pageId}.`,
);
}
});
}
break;
}
} catch (err) {
throw err;
}
}
@OnWorkerEvent('active')
onActive(job: Job) {
if (job.name === QueueJob.PAGE_BACKLINKS) {
this.logger.debug(`Processing ${job.name} job`);
}
}
@OnWorkerEvent('failed')
onError(job: Job) {
if (job.name === QueueJob.PAGE_BACKLINKS) {
this.logger.error(
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
);
}
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
if (job.name === QueueJob.PAGE_BACKLINKS) {
this.logger.debug(`Completed ${job.name} job`);
}
}
async onModuleDestroy(): Promise<void> {
if (this.worker) {
await this.worker.close();
}
}
}
@@ -0,0 +1,87 @@
import { Logger, OnModuleDestroy } from '@nestjs/common';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { QueueJob, QueueName } from '../constants';
import {
IAddPageWatchersJob,
IPageBacklinkJob,
} from '../constants/queue.interface';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { BacklinkRepo } from '@docmost/db/repos/backlink/backlink.repo';
import {
WatcherRepo,
WatcherType,
} from '@docmost/db/repos/watcher/watcher.repo';
import { InsertableWatcher } from '@docmost/db/types/entity.types';
import { processBacklinks } from '../tasks/backlinks.task';
@Processor(QueueName.GENERAL_QUEUE)
export class GeneralQueueProcessor
extends WorkerHost
implements OnModuleDestroy
{
private readonly logger = new Logger(GeneralQueueProcessor.name);
constructor(
@InjectKysely() private readonly db: KyselyDB,
private readonly backlinkRepo: BacklinkRepo,
private readonly watcherRepo: WatcherRepo,
) {
super();
}
async process(job: Job): Promise<void> {
try {
switch (job.name) {
case QueueJob.ADD_PAGE_WATCHERS: {
const { userIds, pageId, spaceId, workspaceId } =
job.data as IAddPageWatchersJob;
const watchers: InsertableWatcher[] = userIds.map((userId) => ({
userId,
pageId,
spaceId,
workspaceId,
type: WatcherType.PAGE,
addedById: userId,
}));
await this.watcherRepo.insertMany(watchers);
break;
}
case QueueJob.PAGE_BACKLINKS: {
await processBacklinks(
this.db,
this.backlinkRepo,
job.data as IPageBacklinkJob,
);
break;
}
}
} catch (err) {
throw err;
}
}
@OnWorkerEvent('active')
onActive(job: Job) {
this.logger.debug(`Processing ${job.name} job`);
}
@OnWorkerEvent('failed')
onError(job: Job) {
this.logger.error(
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
);
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.debug(`Completed ${job.name} job`);
}
async onModuleDestroy(): Promise<void> {
if (this.worker) {
await this.worker.close();
}
}
}