diff --git a/apps/server/src/collaboration/collaboration.module.ts b/apps/server/src/collaboration/collaboration.module.ts index f11473ac..c4444d0b 100644 --- a/apps/server/src/collaboration/collaboration.module.ts +++ b/apps/server/src/collaboration/collaboration.module.ts @@ -10,6 +10,7 @@ import { TokenModule } from '../core/auth/token.module'; import { HistoryProcessor } from './processors/history.processor'; import { LoggerExtension } from './extensions/logger.extension'; import { CollaborationHandler } from './collaboration.handler'; +import { CollabHistoryService } from './services/collab-history.service'; @Module({ providers: [ @@ -18,6 +19,7 @@ import { CollaborationHandler } from './collaboration.handler'; PersistenceExtension, LoggerExtension, HistoryProcessor, + CollabHistoryService, CollaborationHandler, ], exports: [CollaborationGateway], diff --git a/apps/server/src/collaboration/extensions/persistence.extension.ts b/apps/server/src/collaboration/extensions/persistence.extension.ts index 72aa32de..4548b40c 100644 --- a/apps/server/src/collaboration/extensions/persistence.extension.ts +++ b/apps/server/src/collaboration/extensions/persistence.extension.ts @@ -26,6 +26,7 @@ import { IPageHistoryJob, } from '../../integrations/queue/constants/queue.interface'; import { Page } from '@docmost/db/types/entity.types'; +import { CollabHistoryService } from '../services/collab-history.service'; import { HISTORY_FAST_INTERVAL, HISTORY_FAST_THRESHOLD, @@ -43,6 +44,7 @@ export class PersistenceExtension implements Extension { @InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue, @InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue, @InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue, + private readonly collabHistory: CollabHistoryService, ) {} async onLoadDocument(data: onLoadDocumentPayload) { @@ -108,6 +110,7 @@ export class PersistenceExtension implements Extension { } let page: Page = null; + const editingUserIds = this.consumeContributors(documentName); try { await executeTx(this.db, async (trx) => { @@ -130,13 +133,9 @@ export class PersistenceExtension implements Extension { let contributorIds = undefined; try { const existingContributors = page.contributorIds || []; - const contributorSet = this.contributors.get(documentName); - contributorSet.add(page.creatorId); - const newContributors = [...contributorSet]; contributorIds = Array.from( - new Set([...existingContributors, ...newContributors]), + new Set([...existingContributors, ...editingUserIds, page.creatorId]), ); - this.contributors.delete(documentName); } catch (err) { //this.logger.debug('Contributors error:' + err?.['message']); } @@ -160,6 +159,8 @@ export class PersistenceExtension implements Extension { } if (page) { + await this.collabHistory.addContributors(pageId, editingUserIds); + const mentions = extractMentions(tiptapJson); const pageMentions = extractPageMentions(mentions); @@ -195,6 +196,14 @@ export class PersistenceExtension implements Extension { this.contributors.delete(documentName); } + private consumeContributors(documentName: string): string[] { + const contributorSet = this.contributors.get(documentName); + if (!contributorSet) return []; + const userIds = [...contributorSet]; + this.contributors.delete(documentName); + return userIds; + } + private async enqueuePageHistory(page: Page): Promise { const pageAge = Date.now() - new Date(page.createdAt).getTime(); const delay = diff --git a/apps/server/src/collaboration/processors/history.processor.ts b/apps/server/src/collaboration/processors/history.processor.ts index 6b28e127..985f0b3e 100644 --- a/apps/server/src/collaboration/processors/history.processor.ts +++ b/apps/server/src/collaboration/processors/history.processor.ts @@ -6,6 +6,7 @@ import { IPageHistoryJob } from '../../integrations/queue/constants/queue.interf import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo'; import { PageRepo } from '@docmost/db/repos/page/page.repo'; import { isDeepStrictEqual } from 'node:util'; +import { CollabHistoryService } from '../services/collab-history.service'; @Processor(QueueName.HISTORY_QUEUE) export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { @@ -14,6 +15,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { constructor( private readonly pageHistoryRepo: PageHistoryRepo, private readonly pageRepo: PageRepo, + private readonly collabHistory: CollabHistoryService, ) { super(); } @@ -30,6 +32,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { if (!page) { this.logger.warn(`Page ${pageId} not found, skipping history`); + await this.collabHistory.clearContributors(pageId); return; } @@ -42,8 +45,19 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { !lastHistory || !isDeepStrictEqual(lastHistory.content, page.content) ) { - await this.pageHistoryRepo.saveHistory(page); - this.logger.debug(`History created for page: ${pageId}`); + const contributorIds = + await this.collabHistory.popContributors(pageId); + + try { + await this.pageHistoryRepo.saveHistory(page, { contributorIds }); + this.logger.debug(`History created for page: ${pageId}`); + } catch (err) { + await this.collabHistory.addContributors( + pageId, + contributorIds, + ); + throw err; + } } } catch (err) { throw err; diff --git a/apps/server/src/collaboration/services/collab-history.service.ts b/apps/server/src/collaboration/services/collab-history.service.ts new file mode 100644 index 00000000..b7cf5086 --- /dev/null +++ b/apps/server/src/collaboration/services/collab-history.service.ts @@ -0,0 +1,30 @@ +import { Injectable } from '@nestjs/common'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; + +const REDIS_KEY_PREFIX = 'history:contributors:'; + +@Injectable() +export class CollabHistoryService { + private readonly redis: Redis; + + constructor(private readonly redisService: RedisService) { + this.redis = this.redisService.getOrThrow(); + } + + async addContributors(pageId: string, userIds: string[]): Promise { + if (userIds.length === 0) return; + await this.redis.sadd(REDIS_KEY_PREFIX + pageId, ...userIds); + } + + async popContributors(pageId: string): Promise { + const key = REDIS_KEY_PREFIX + pageId; + const count = await this.redis.scard(key); + if (count === 0) return []; + return await this.redis.spop(key, count); + } + + async clearContributors(pageId: string): Promise { + await this.redis.del(REDIS_KEY_PREFIX + pageId); + } +} diff --git a/apps/server/src/database/migrations/20260209T120000-add-contributor-ids-to-page-history.ts b/apps/server/src/database/migrations/20260209T120000-add-contributor-ids-to-page-history.ts new file mode 100644 index 00000000..4e50ad3a --- /dev/null +++ b/apps/server/src/database/migrations/20260209T120000-add-contributor-ids-to-page-history.ts @@ -0,0 +1,15 @@ +import { type Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await db.schema + .alterTable('page_history') + .addColumn('contributor_ids', sql`uuid[]`, (col) => col.defaultTo('{}')) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema + .alterTable('page_history') + .dropColumn('contributor_ids') + .execute(); +} diff --git a/apps/server/src/database/repos/page/page-history.repo.ts b/apps/server/src/database/repos/page/page-history.repo.ts index c61c7e88..aca38f45 100644 --- a/apps/server/src/database/repos/page/page-history.repo.ts +++ b/apps/server/src/database/repos/page/page-history.repo.ts @@ -9,8 +9,8 @@ import { } from '@docmost/db/types/entity.types'; import { PaginationOptions } from '@docmost/db/pagination/pagination-options'; import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagination'; -import { jsonObjectFrom } from 'kysely/helpers/postgres'; -import { ExpressionBuilder } from 'kysely'; +import { jsonArrayFrom, jsonObjectFrom } from 'kysely/helpers/postgres'; +import { ExpressionBuilder, sql } from 'kysely'; import { DB } from '@docmost/db/types/db'; @Injectable() @@ -25,6 +25,7 @@ export class PageHistoryRepo { 'icon', 'coverPhoto', 'lastUpdatedById', + 'contributorIds', 'spaceId', 'workspaceId', 'createdAt', @@ -44,6 +45,7 @@ export class PageHistoryRepo { .select(this.baseFields) .$if(opts?.includeContent, (qb) => qb.select('content')) .select((eb) => this.withLastUpdatedBy(eb)) + .select((eb) => this.withContributors(eb)) .where('id', '=', pageHistoryId) .executeTakeFirst(); } @@ -60,7 +62,10 @@ export class PageHistoryRepo { .executeTakeFirst(); } - async saveHistory(page: Page, trx?: KyselyTransaction): Promise { + async saveHistory( + page: Page, + opts?: { contributorIds?: string[]; trx?: KyselyTransaction }, + ): Promise { await this.insertPageHistory( { pageId: page.id, @@ -70,10 +75,11 @@ export class PageHistoryRepo { icon: page.icon, coverPhoto: page.coverPhoto, lastUpdatedById: page.lastUpdatedById ?? page.creatorId, + contributorIds: opts?.contributorIds, spaceId: page.spaceId, workspaceId: page.workspaceId, }, - trx, + opts?.trx, ); } @@ -82,6 +88,7 @@ export class PageHistoryRepo { .selectFrom('pageHistory') .select(this.baseFields) .select((eb) => this.withLastUpdatedBy(eb)) + .select((eb) => this.withContributors(eb)) .where('pageId', '=', pageId); return executeWithCursorPagination(query, { @@ -120,4 +127,17 @@ export class PageHistoryRepo { .whereRef('users.id', '=', 'pageHistory.lastUpdatedById'), ).as('lastUpdatedBy'); } + + withContributors(eb: ExpressionBuilder) { + return jsonArrayFrom( + eb + .selectFrom('users') + .select(['users.id', 'users.name', 'users.avatarUrl']) + .whereRef( + 'users.id', + '=', + sql`ANY(${eb.ref('pageHistory.contributorIds')})`, + ), + ).as('contributors'); + } } diff --git a/apps/server/src/database/types/db.d.ts b/apps/server/src/database/types/db.d.ts index a4197f4e..60b328ee 100644 --- a/apps/server/src/database/types/db.d.ts +++ b/apps/server/src/database/types/db.d.ts @@ -199,6 +199,7 @@ export interface GroupUsers { export interface PageHistory { content: Json | null; + contributorIds: Generated; coverPhoto: string | null; createdAt: Generated; icon: string | null;