This commit is contained in:
Philipinho
2026-03-30 20:20:51 +01:00
parent 0f22e44d29
commit a6d2140197
5 changed files with 149 additions and 43 deletions
@@ -4,8 +4,8 @@ import type { Redis } from 'ioredis';
const KEY_PREFIX = 'page-update:emails:';
const DIGEST_PREFIX = 'page-update:digest:';
const TTL_SECONDS = 86400; // 24 hours
const MAX_IMMEDIATE_EMAILS = 10;
const TTL_SECONDS = 28800; // 8 hours
const MAX_IMMEDIATE_EMAILS = 4;
@Injectable()
export class PageUpdateEmailRateLimiter {
@@ -186,8 +186,10 @@ export class PageNotificationService {
const candidateIds = watcherIds.filter((id) => !actorSet.has(id));
if (candidateIds.length === 0) return;
const afterPrefs = await this.getEligiblePageUpdateUserIds(candidateIds);
if (afterPrefs.length === 0) return;
const eligibleUsers = await this.getEligiblePageUpdateUsers(candidateIds);
if (eligibleUsers.size === 0) return;
const afterPrefs = [...eligibleUsers.keys()];
const recentlyNotified =
await this.notificationRepo.getRecentlyNotifiedUserIds(
@@ -237,6 +239,7 @@ export class PageNotificationService {
notification.id,
`${actor.name} updated ${pageTitle}`,
PageUpdateEmail({
userName: eligibleUsers.get(userId) ?? '',
actorName: actor.name,
pageTitle,
pageUrl: basePageUrl,
@@ -255,37 +258,38 @@ export class PageNotificationService {
}
}
private async getEligiblePageUpdateUserIds(
private async getEligiblePageUpdateUsers(
userIds: string[],
): Promise<string[]> {
if (userIds.length === 0) return [];
): Promise<Map<string, string>> {
if (userIds.length === 0) return new Map();
const users = await this.db
.selectFrom('users')
.select(['id', 'settings'])
.select(['id', 'name', 'settings'])
.where('id', 'in', userIds)
.where('deletedAt', 'is', null)
.where('deactivatedAt', 'is', null)
.execute();
return users
.filter((u) => {
const settings = u.settings as any;
return settings?.notifications?.['page.updated'] !== false;
})
.map((u) => u.id);
const eligible = new Map<string, string>();
for (const u of users) {
const settings = u.settings as any;
if (settings?.notifications?.['page.updated'] !== false) {
eligible.set(u.id, u.name);
}
}
return eligible;
}
private async scheduleDigest(
userId: string,
workspaceId: string,
): Promise<void> {
const jobId = `page-update-digest:${userId}`;
await this.notificationQueue
.add(
QueueJob.PAGE_UPDATE_DIGEST,
{ userId, workspaceId },
{ jobId, delay: DIGEST_DELAY_MS },
{ delay: DIGEST_DELAY_MS, removeOnComplete: true },
)
.catch((err) => {
this.logger.error(
@@ -298,40 +302,102 @@ export class PageNotificationService {
const notificationIds = await this.rateLimiter.popDigest(userId);
if (notificationIds.length === 0) return;
const notifications = await this.db
.selectFrom('notifications')
.select(['id', 'pageId'])
.where('id', 'in', notificationIds)
.execute();
const [user, notifications] = await Promise.all([
this.db
.selectFrom('users')
.select(['id', 'name'])
.where('id', '=', userId)
.executeTakeFirst(),
this.db
.selectFrom('notifications')
.select(['id', 'pageId', 'actorId'])
.where('id', 'in', notificationIds)
.execute(),
]);
if (notifications.length === 0) return;
if (!user || notifications.length === 0) return;
const pageIds = [...new Set(notifications.map((n) => n.pageId).filter(Boolean))];
const pageIds = [
...new Set(notifications.map((n) => n.pageId).filter(Boolean)),
];
const actorIds = [
...new Set(notifications.map((n) => n.actorId).filter(Boolean)),
];
const pages = await this.db
const allPages = await this.db
.selectFrom('pages')
.innerJoin('spaces', 'spaces.id', 'pages.spaceId')
.select([
'pages.id',
'pages.title',
'pages.slugId',
'pages.spaceId',
'spaces.slug as spaceSlug',
])
.where('pages.id', 'in', pageIds)
.execute();
if (allPages.length === 0) return;
const spaceIds = [...new Set(allPages.map((p) => p.spaceId))];
const accessibleSpaceIds = new Set<string>();
for (const spaceId of spaceIds) {
const usersWithAccess =
await this.spaceMemberRepo.getUserIdsWithSpaceAccess([userId], spaceId);
if (usersWithAccess.has(userId)) accessibleSpaceIds.add(spaceId);
}
const spaceFilteredPages = allPages.filter((p) =>
accessibleSpaceIds.has(p.spaceId),
);
if (spaceFilteredPages.length === 0) return;
const accessiblePageIds = new Set<string>();
for (const p of spaceFilteredPages) {
const hasAccess = await this.pagePermissionRepo.getUserIdsWithPageAccess(
p.id,
[userId],
);
if (hasAccess.includes(userId)) accessiblePageIds.add(p.id);
}
const pages = spaceFilteredPages.filter((p) => accessiblePageIds.has(p.id));
if (pages.length === 0) return;
const actors = actorIds.length > 0
? await this.db
.selectFrom('users')
.select(['id', 'name'])
.where('id', 'in', actorIds)
.execute()
: [];
const actorMap = new Map(actors.map((a) => [a.id, a.name]));
const pageActors = new Map<string, Set<string>>();
for (const n of notifications) {
if (!n.pageId || !n.actorId) continue;
const names = pageActors.get(n.pageId) ?? new Set();
const name = actorMap.get(n.actorId);
if (name) names.add(name);
pageActors.set(n.pageId, names);
}
const pageUpdates = pages.map((p) => ({
title: getPageTitle(p.title),
url: `${appUrl}/s/${p.spaceSlug}/p/${p.slugId}`,
updatedBy: [...(pageActors.get(p.id) ?? [])],
}));
if (pageUpdates.length === 0) return;
await this.notificationService.queueEmail(
userId,
notificationIds[0],
`${pageUpdates.length} pages were updated`,
PageUpdateDigestEmail({ pageUpdates }),
`Your digest: ${pageUpdates.length} page updates`,
PageUpdateDigestEmail({
userName: user.name,
pageUpdates,
totalUpdates: pageUpdates.length,
}),
NotificationType.PAGE_UPDATED,
);
}