This commit is contained in:
Philipinho
2026-03-30 20:51:15 +01:00
parent cb177811c0
commit 308ce923cc
4 changed files with 5 additions and 8 deletions
@@ -12,7 +12,6 @@ import { PageRepo } from '@docmost/db/repos/page/page.repo';
import { isDeepStrictEqual } from 'node:util'; import { isDeepStrictEqual } from 'node:util';
import { CollabHistoryService } from '../services/collab-history.service'; import { CollabHistoryService } from '../services/collab-history.service';
import { WatcherService } from '../../core/watcher/watcher.service'; import { WatcherService } from '../../core/watcher/watcher.service';
import { NotificationType } from '../../core/notification/notification.constants';
@Processor(QueueName.HISTORY_QUEUE) @Processor(QueueName.HISTORY_QUEUE)
export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
@@ -72,7 +71,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
if (contributorIds.length > 0 && lastHistory?.content) { if (contributorIds.length > 0 && lastHistory?.content) {
await this.notificationQueue await this.notificationQueue
.add(NotificationType.PAGE_UPDATED, { .add(QueueJob.PAGE_UPDATED, {
pageId, pageId,
spaceId: page.spaceId, spaceId: page.spaceId,
workspaceId: page.workspaceId, workspaceId: page.workspaceId,
@@ -14,7 +14,6 @@ import {
import { CommentNotificationService } from './services/comment.notification'; import { CommentNotificationService } from './services/comment.notification';
import { PageNotificationService } from './services/page.notification'; import { PageNotificationService } from './services/page.notification';
import { DomainService } from '../../integrations/environment/domain.service'; import { DomainService } from '../../integrations/environment/domain.service';
import { NotificationType } from './notification.constants';
@Processor(QueueName.NOTIFICATION_QUEUE) @Processor(QueueName.NOTIFICATION_QUEUE)
export class NotificationProcessor export class NotificationProcessor
@@ -79,7 +78,7 @@ export class NotificationProcessor
break; break;
} }
case NotificationType.PAGE_UPDATED: { case QueueJob.PAGE_UPDATED: {
await this.pageNotificationService.processPageUpdate( await this.pageNotificationService.processPageUpdate(
job.data as IPageUpdateNotificationJob, job.data as IPageUpdateNotificationJob,
appUrl, appUrl,
@@ -24,10 +24,9 @@ export class PageUpdateEmailRateLimiter {
async addToDigest(userId: string, notificationId: string): Promise<boolean> { async addToDigest(userId: string, notificationId: string): Promise<boolean> {
const key = DIGEST_PREFIX + userId; const key = DIGEST_PREFIX + userId;
const isNew = (await this.redis.llen(key)) === 0; const len = await this.redis.rpush(key, notificationId);
await this.redis.rpush(key, notificationId);
await this.redis.expire(key, TTL_SECONDS); await this.redis.expire(key, TTL_SECONDS);
return isNew; return len === 1;
} }
async popDigest(userId: string): Promise<string[]> { async popDigest(userId: string): Promise<string[]> {
@@ -392,7 +392,7 @@ export class PageNotificationService {
await this.notificationService.queueEmail( await this.notificationService.queueEmail(
userId, userId,
notificationIds[0], notificationIds[0],
`Your digest: ${pageUpdates.length} page updates`, `Your digest: ${pageUpdates.length} page ${pageUpdates.length === 1 ? 'update' : 'updates'}`,
PageUpdateDigestEmail({ PageUpdateDigestEmail({
userName: user.name, userName: user.name,
pageUpdates, pageUpdates,