From 0698590c8a6028b70ab5aa2100eb7e6566256f57 Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Mon, 30 Mar 2026 13:50:54 +0100 Subject: [PATCH] digests --- .../core/notification/notification.module.ts | 2 + .../notification/notification.processor.ts | 6 + .../page-update-email-rate-limiter.ts | 44 ++++++++ .../services/page.notification.ts | 105 ++++++++++++++++-- apps/server/src/ee | 2 +- .../queue/constants/queue.constants.ts | 1 + .../emails/page-update-digest-email.tsx | 42 +++++++ 7 files changed, 189 insertions(+), 13 deletions(-) create mode 100644 apps/server/src/core/notification/services/page-update-email-rate-limiter.ts create mode 100644 apps/server/src/integrations/transactional/emails/page-update-digest-email.tsx diff --git a/apps/server/src/core/notification/notification.module.ts b/apps/server/src/core/notification/notification.module.ts index a142eaf8..83778294 100644 --- a/apps/server/src/core/notification/notification.module.ts +++ b/apps/server/src/core/notification/notification.module.ts @@ -4,6 +4,7 @@ import { NotificationController } from './notification.controller'; import { NotificationProcessor } from './notification.processor'; import { CommentNotificationService } from './services/comment.notification'; import { PageNotificationService } from './services/page.notification'; +import { PageUpdateEmailRateLimiter } from './services/page-update-email-rate-limiter'; @Module({ imports: [], @@ -13,6 +14,7 @@ import { PageNotificationService } from './services/page.notification'; NotificationProcessor, CommentNotificationService, PageNotificationService, + PageUpdateEmailRateLimiter, ], exports: [NotificationService], }) diff --git a/apps/server/src/core/notification/notification.processor.ts b/apps/server/src/core/notification/notification.processor.ts index 417183c5..f24503a1 100644 --- a/apps/server/src/core/notification/notification.processor.ts +++ b/apps/server/src/core/notification/notification.processor.ts @@ -87,6 +87,12 @@ export class NotificationProcessor break; } + case QueueJob.PAGE_UPDATE_DIGEST: { + const { userId } = job.data as unknown as { userId: string }; + await this.pageNotificationService.processDigest(userId, appUrl); + break; + } + default: this.logger.warn(`Unknown notification job: ${job.name}`); } diff --git a/apps/server/src/core/notification/services/page-update-email-rate-limiter.ts b/apps/server/src/core/notification/services/page-update-email-rate-limiter.ts new file mode 100644 index 00000000..29841c37 --- /dev/null +++ b/apps/server/src/core/notification/services/page-update-email-rate-limiter.ts @@ -0,0 +1,44 @@ +import { Injectable } from '@nestjs/common'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; + +const KEY_PREFIX = 'page-update:emails:'; +const DIGEST_PREFIX = 'page-update:digest:'; +const TTL_SECONDS = 86400; // 24 hours +const MAX_IMMEDIATE_EMAILS = 10; + +@Injectable() +export class PageUpdateEmailRateLimiter { + private readonly redis: Redis; + + constructor(private readonly redisService: RedisService) { + this.redis = this.redisService.getOrThrow(); + } + + async canSendEmail(userId: string): Promise { + const key = KEY_PREFIX + userId; + const count = await this.redis.incr(key); + await this.redis.expire(key, TTL_SECONDS, 'NX'); + return count <= MAX_IMMEDIATE_EMAILS; + } + + async addToDigest(userId: string, notificationId: string): Promise { + const key = DIGEST_PREFIX + userId; + const isNew = (await this.redis.llen(key)) === 0; + await this.redis.rpush(key, notificationId); + await this.redis.expire(key, TTL_SECONDS); + return isNew; + } + + async popDigest(userId: string): Promise { + const key = DIGEST_PREFIX + userId; + const [ids] = await this.redis + .multi() + .lrange(key, 0, -1) + .del(key) + .exec(); + + return (ids?.[1] as string[]) ?? []; + } + +} diff --git a/apps/server/src/core/notification/services/page.notification.ts b/apps/server/src/core/notification/services/page.notification.ts index e3b15ac9..651a648b 100644 --- a/apps/server/src/core/notification/services/page.notification.ts +++ b/apps/server/src/core/notification/services/page.notification.ts @@ -1,5 +1,7 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { InjectKysely } from 'nestjs-kysely'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; import { KyselyDB } from '@docmost/db/types/kysely.types'; import { IPageMentionNotificationJob, @@ -12,15 +14,21 @@ import { NotificationRepo } from '@docmost/db/repos/notification/notification.re import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo'; import { PagePermissionRepo } from '@docmost/db/repos/page/page-permission.repo'; import { WatcherRepo } from '@docmost/db/repos/watcher/watcher.repo'; +import { PageUpdateEmailRateLimiter } from './page-update-email-rate-limiter'; import { PageMentionEmail } from '@docmost/transactional/emails/page-mention-email'; import { PageUpdateEmail } from '@docmost/transactional/emails/page-update-email'; +import { PageUpdateDigestEmail } from '@docmost/transactional/emails/page-update-digest-email'; import { PermissionGrantedEmail } from '@docmost/transactional/emails/permission-granted-email'; import { getPageTitle } from '../../../common/helpers'; +import { QueueJob, QueueName } from '../../../integrations/queue/constants'; const PAGE_UPDATE_COOLDOWN_HOURS = 7; +const DIGEST_DELAY_MS = 3 * 60 * 60 * 1000; // 3 hours @Injectable() export class PageNotificationService { + private readonly logger = new Logger(PageNotificationService.name); + constructor( @InjectKysely() private readonly db: KyselyDB, private readonly notificationService: NotificationService, @@ -28,6 +36,8 @@ export class PageNotificationService { private readonly spaceMemberRepo: SpaceMemberRepo, private readonly pagePermissionRepo: PagePermissionRepo, private readonly watcherRepo: WatcherRepo, + private readonly rateLimiter: PageUpdateEmailRateLimiter, + @InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue, ) {} async processPageMention(data: IPageMentionNotificationJob, appUrl: string) { @@ -220,17 +230,28 @@ export class PageNotificationService { }); if (!notification) continue; - await this.notificationService.queueEmail( - userId, - notification.id, - `${actor.name} updated ${pageTitle}`, - PageUpdateEmail({ - actorName: actor.name, - pageTitle, - pageUrl: basePageUrl, - }), - NotificationType.PAGE_UPDATED, - ); + const canSend = await this.rateLimiter.canSendEmail(userId); + if (canSend) { + await this.notificationService.queueEmail( + userId, + notification.id, + `${actor.name} updated ${pageTitle}`, + PageUpdateEmail({ + actorName: actor.name, + pageTitle, + pageUrl: basePageUrl, + }), + NotificationType.PAGE_UPDATED, + ); + } else { + const isFirst = await this.rateLimiter.addToDigest( + userId, + notification.id, + ); + if (isFirst) { + await this.scheduleDigest(userId, workspaceId); + } + } } } @@ -255,6 +276,66 @@ export class PageNotificationService { .map((u) => u.id); } + private async scheduleDigest( + userId: string, + workspaceId: string, + ): Promise { + const jobId = `page-update-digest:${userId}`; + await this.notificationQueue + .add( + QueueJob.PAGE_UPDATE_DIGEST, + { userId, workspaceId }, + { jobId, delay: DIGEST_DELAY_MS }, + ) + .catch((err) => { + this.logger.error( + `Failed to schedule digest for ${userId}: ${err.message}`, + ); + }); + } + + async processDigest(userId: string, appUrl: string): Promise { + const notificationIds = await this.rateLimiter.popDigest(userId); + if (notificationIds.length === 0) return; + + const notifications = await this.db + .selectFrom('notifications') + .select(['id', 'pageId']) + .where('id', 'in', notificationIds) + .execute(); + + if (notifications.length === 0) return; + + const pageIds = [...new Set(notifications.map((n) => n.pageId).filter(Boolean))]; + + const pages = await this.db + .selectFrom('pages') + .innerJoin('spaces', 'spaces.id', 'pages.spaceId') + .select([ + 'pages.id', + 'pages.title', + 'pages.slugId', + 'spaces.slug as spaceSlug', + ]) + .where('pages.id', 'in', pageIds) + .execute(); + + const pageUpdates = pages.map((p) => ({ + title: getPageTitle(p.title), + url: `${appUrl}/s/${p.spaceSlug}/p/${p.slugId}`, + })); + + if (pageUpdates.length === 0) return; + + await this.notificationService.queueEmail( + userId, + notificationIds[0], + `${pageUpdates.length} pages were updated`, + PageUpdateDigestEmail({ pageUpdates }), + NotificationType.PAGE_UPDATED, + ); + } + private async getPageContext( actorId: string, pageId: string, diff --git a/apps/server/src/ee b/apps/server/src/ee index f4867260..05f1c816 160000 --- a/apps/server/src/ee +++ b/apps/server/src/ee @@ -1 +1 @@ -Subproject commit f48672608889233c0247c6d4ef7fcddd29540315 +Subproject commit 05f1c816a839072efc1143cce71322a9ed6b4a0a diff --git a/apps/server/src/integrations/queue/constants/queue.constants.ts b/apps/server/src/integrations/queue/constants/queue.constants.ts index 1c66a5f3..92d15426 100644 --- a/apps/server/src/integrations/queue/constants/queue.constants.ts +++ b/apps/server/src/integrations/queue/constants/queue.constants.ts @@ -69,6 +69,7 @@ export enum QueueJob { COMMENT_RESOLVED_NOTIFICATION = 'comment-resolved-notification', PAGE_MENTION_NOTIFICATION = 'page-mention-notification', PAGE_PERMISSION_GRANTED = 'page-permission-granted', + PAGE_UPDATE_DIGEST = 'page-update-digest', AUDIT_LOG = 'audit-log', AUDIT_CLEANUP = 'audit-cleanup', diff --git a/apps/server/src/integrations/transactional/emails/page-update-digest-email.tsx b/apps/server/src/integrations/transactional/emails/page-update-digest-email.tsx new file mode 100644 index 00000000..72066b68 --- /dev/null +++ b/apps/server/src/integrations/transactional/emails/page-update-digest-email.tsx @@ -0,0 +1,42 @@ +import { Link, Section, Text } from '@react-email/components'; +import * as React from 'react'; +import { content, link, paragraph } from '../css/styles'; +import { MailBody } from '../partials/partials'; + +interface PageUpdate { + title: string; + url: string; +} + +interface Props { + pageUpdates: PageUpdate[]; +} + +export const PageUpdateDigestEmail = ({ pageUpdates }: Props) => { + return ( + +
+ Hi there, + + The following {pageUpdates.length} pages you watch were updated: + + {pageUpdates.map((page, i) => ( + + {'• '} + + {page.title} + + + ))} +
+
+ ); +}; + +const listItem = { + ...paragraph, + margin: '4px 0', + lineHeight: 1.4, +}; + +export default PageUpdateDigestEmail;