This commit is contained in:
Philipinho
2026-03-30 13:50:54 +01:00
parent 7ad3af0d7e
commit 0698590c8a
7 changed files with 189 additions and 13 deletions
@@ -4,6 +4,7 @@ import { NotificationController } from './notification.controller';
import { NotificationProcessor } from './notification.processor';
import { CommentNotificationService } from './services/comment.notification';
import { PageNotificationService } from './services/page.notification';
import { PageUpdateEmailRateLimiter } from './services/page-update-email-rate-limiter';
@Module({
imports: [],
@@ -13,6 +14,7 @@ import { PageNotificationService } from './services/page.notification';
NotificationProcessor,
CommentNotificationService,
PageNotificationService,
PageUpdateEmailRateLimiter,
],
exports: [NotificationService],
})
@@ -87,6 +87,12 @@ export class NotificationProcessor
break;
}
case QueueJob.PAGE_UPDATE_DIGEST: {
const { userId } = job.data as unknown as { userId: string };
await this.pageNotificationService.processDigest(userId, appUrl);
break;
}
default:
this.logger.warn(`Unknown notification job: ${job.name}`);
}
@@ -0,0 +1,44 @@
import { Injectable } from '@nestjs/common';
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
import type { Redis } from 'ioredis';
const KEY_PREFIX = 'page-update:emails:';
const DIGEST_PREFIX = 'page-update:digest:';
const TTL_SECONDS = 86400; // 24 hours
const MAX_IMMEDIATE_EMAILS = 10;
@Injectable()
export class PageUpdateEmailRateLimiter {
private readonly redis: Redis;
constructor(private readonly redisService: RedisService) {
this.redis = this.redisService.getOrThrow();
}
async canSendEmail(userId: string): Promise<boolean> {
const key = KEY_PREFIX + userId;
const count = await this.redis.incr(key);
await this.redis.expire(key, TTL_SECONDS, 'NX');
return count <= MAX_IMMEDIATE_EMAILS;
}
async addToDigest(userId: string, notificationId: string): Promise<boolean> {
const key = DIGEST_PREFIX + userId;
const isNew = (await this.redis.llen(key)) === 0;
await this.redis.rpush(key, notificationId);
await this.redis.expire(key, TTL_SECONDS);
return isNew;
}
async popDigest(userId: string): Promise<string[]> {
const key = DIGEST_PREFIX + userId;
const [ids] = await this.redis
.multi()
.lrange(key, 0, -1)
.del(key)
.exec();
return (ids?.[1] as string[]) ?? [];
}
}
@@ -1,5 +1,7 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import {
IPageMentionNotificationJob,
@@ -12,15 +14,21 @@ import { NotificationRepo } from '@docmost/db/repos/notification/notification.re
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
import { PagePermissionRepo } from '@docmost/db/repos/page/page-permission.repo';
import { WatcherRepo } from '@docmost/db/repos/watcher/watcher.repo';
import { PageUpdateEmailRateLimiter } from './page-update-email-rate-limiter';
import { PageMentionEmail } from '@docmost/transactional/emails/page-mention-email';
import { PageUpdateEmail } from '@docmost/transactional/emails/page-update-email';
import { PageUpdateDigestEmail } from '@docmost/transactional/emails/page-update-digest-email';
import { PermissionGrantedEmail } from '@docmost/transactional/emails/permission-granted-email';
import { getPageTitle } from '../../../common/helpers';
import { QueueJob, QueueName } from '../../../integrations/queue/constants';
const PAGE_UPDATE_COOLDOWN_HOURS = 7;
const DIGEST_DELAY_MS = 3 * 60 * 60 * 1000; // 3 hours
@Injectable()
export class PageNotificationService {
private readonly logger = new Logger(PageNotificationService.name);
constructor(
@InjectKysely() private readonly db: KyselyDB,
private readonly notificationService: NotificationService,
@@ -28,6 +36,8 @@ export class PageNotificationService {
private readonly spaceMemberRepo: SpaceMemberRepo,
private readonly pagePermissionRepo: PagePermissionRepo,
private readonly watcherRepo: WatcherRepo,
private readonly rateLimiter: PageUpdateEmailRateLimiter,
@InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue,
) {}
async processPageMention(data: IPageMentionNotificationJob, appUrl: string) {
@@ -220,17 +230,28 @@ export class PageNotificationService {
});
if (!notification) continue;
await this.notificationService.queueEmail(
userId,
notification.id,
`${actor.name} updated ${pageTitle}`,
PageUpdateEmail({
actorName: actor.name,
pageTitle,
pageUrl: basePageUrl,
}),
NotificationType.PAGE_UPDATED,
);
const canSend = await this.rateLimiter.canSendEmail(userId);
if (canSend) {
await this.notificationService.queueEmail(
userId,
notification.id,
`${actor.name} updated ${pageTitle}`,
PageUpdateEmail({
actorName: actor.name,
pageTitle,
pageUrl: basePageUrl,
}),
NotificationType.PAGE_UPDATED,
);
} else {
const isFirst = await this.rateLimiter.addToDigest(
userId,
notification.id,
);
if (isFirst) {
await this.scheduleDigest(userId, workspaceId);
}
}
}
}
@@ -255,6 +276,66 @@ export class PageNotificationService {
.map((u) => u.id);
}
private async scheduleDigest(
userId: string,
workspaceId: string,
): Promise<void> {
const jobId = `page-update-digest:${userId}`;
await this.notificationQueue
.add(
QueueJob.PAGE_UPDATE_DIGEST,
{ userId, workspaceId },
{ jobId, delay: DIGEST_DELAY_MS },
)
.catch((err) => {
this.logger.error(
`Failed to schedule digest for ${userId}: ${err.message}`,
);
});
}
async processDigest(userId: string, appUrl: string): Promise<void> {
const notificationIds = await this.rateLimiter.popDigest(userId);
if (notificationIds.length === 0) return;
const notifications = await this.db
.selectFrom('notifications')
.select(['id', 'pageId'])
.where('id', 'in', notificationIds)
.execute();
if (notifications.length === 0) return;
const pageIds = [...new Set(notifications.map((n) => n.pageId).filter(Boolean))];
const pages = await this.db
.selectFrom('pages')
.innerJoin('spaces', 'spaces.id', 'pages.spaceId')
.select([
'pages.id',
'pages.title',
'pages.slugId',
'spaces.slug as spaceSlug',
])
.where('pages.id', 'in', pageIds)
.execute();
const pageUpdates = pages.map((p) => ({
title: getPageTitle(p.title),
url: `${appUrl}/s/${p.spaceSlug}/p/${p.slugId}`,
}));
if (pageUpdates.length === 0) return;
await this.notificationService.queueEmail(
userId,
notificationIds[0],
`${pageUpdates.length} pages were updated`,
PageUpdateDigestEmail({ pageUpdates }),
NotificationType.PAGE_UPDATED,
);
}
private async getPageContext(
actorId: string,
pageId: string,
@@ -69,6 +69,7 @@ export enum QueueJob {
COMMENT_RESOLVED_NOTIFICATION = 'comment-resolved-notification',
PAGE_MENTION_NOTIFICATION = 'page-mention-notification',
PAGE_PERMISSION_GRANTED = 'page-permission-granted',
PAGE_UPDATE_DIGEST = 'page-update-digest',
AUDIT_LOG = 'audit-log',
AUDIT_CLEANUP = 'audit-cleanup',
@@ -0,0 +1,42 @@
import { Link, Section, Text } from '@react-email/components';
import * as React from 'react';
import { content, link, paragraph } from '../css/styles';
import { MailBody } from '../partials/partials';
interface PageUpdate {
title: string;
url: string;
}
interface Props {
pageUpdates: PageUpdate[];
}
export const PageUpdateDigestEmail = ({ pageUpdates }: Props) => {
return (
<MailBody>
<Section style={content}>
<Text style={paragraph}>Hi there,</Text>
<Text style={paragraph}>
The following {pageUpdates.length} pages you watch were updated:
</Text>
{pageUpdates.map((page, i) => (
<Text key={i} style={listItem}>
{'• '}
<Link href={page.url} style={link}>
{page.title}
</Link>
</Text>
))}
</Section>
</MailBody>
);
};
const listItem = {
...paragraph,
margin: '4px 0',
lineHeight: 1.4,
};
export default PageUpdateDigestEmail;