mirror of
https://github.com/docmost/docmost.git
synced 2026-05-07 06:23:06 +08:00
feat: save multiple version contributors
This commit is contained in:
@@ -10,6 +10,7 @@ import { TokenModule } from '../core/auth/token.module';
|
||||
import { HistoryProcessor } from './processors/history.processor';
|
||||
import { LoggerExtension } from './extensions/logger.extension';
|
||||
import { CollaborationHandler } from './collaboration.handler';
|
||||
import { CollabHistoryService } from './services/collab-history.service';
|
||||
|
||||
@Module({
|
||||
providers: [
|
||||
@@ -18,6 +19,7 @@ import { CollaborationHandler } from './collaboration.handler';
|
||||
PersistenceExtension,
|
||||
LoggerExtension,
|
||||
HistoryProcessor,
|
||||
CollabHistoryService,
|
||||
CollaborationHandler,
|
||||
],
|
||||
exports: [CollaborationGateway],
|
||||
|
||||
@@ -26,6 +26,7 @@ import {
|
||||
IPageHistoryJob,
|
||||
} from '../../integrations/queue/constants/queue.interface';
|
||||
import { Page } from '@docmost/db/types/entity.types';
|
||||
import { CollabHistoryService } from '../services/collab-history.service';
|
||||
import {
|
||||
HISTORY_FAST_INTERVAL,
|
||||
HISTORY_FAST_THRESHOLD,
|
||||
@@ -43,6 +44,7 @@ export class PersistenceExtension implements Extension {
|
||||
@InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue,
|
||||
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
|
||||
@InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue,
|
||||
private readonly collabHistory: CollabHistoryService,
|
||||
) {}
|
||||
|
||||
async onLoadDocument(data: onLoadDocumentPayload) {
|
||||
@@ -108,6 +110,7 @@ export class PersistenceExtension implements Extension {
|
||||
}
|
||||
|
||||
let page: Page = null;
|
||||
const editingUserIds = this.consumeContributors(documentName);
|
||||
|
||||
try {
|
||||
await executeTx(this.db, async (trx) => {
|
||||
@@ -130,13 +133,9 @@ export class PersistenceExtension implements Extension {
|
||||
let contributorIds = undefined;
|
||||
try {
|
||||
const existingContributors = page.contributorIds || [];
|
||||
const contributorSet = this.contributors.get(documentName);
|
||||
contributorSet.add(page.creatorId);
|
||||
const newContributors = [...contributorSet];
|
||||
contributorIds = Array.from(
|
||||
new Set([...existingContributors, ...newContributors]),
|
||||
new Set([...existingContributors, ...editingUserIds, page.creatorId]),
|
||||
);
|
||||
this.contributors.delete(documentName);
|
||||
} catch (err) {
|
||||
//this.logger.debug('Contributors error:' + err?.['message']);
|
||||
}
|
||||
@@ -160,6 +159,8 @@ export class PersistenceExtension implements Extension {
|
||||
}
|
||||
|
||||
if (page) {
|
||||
await this.collabHistory.addContributors(pageId, editingUserIds);
|
||||
|
||||
const mentions = extractMentions(tiptapJson);
|
||||
const pageMentions = extractPageMentions(mentions);
|
||||
|
||||
@@ -195,6 +196,14 @@ export class PersistenceExtension implements Extension {
|
||||
this.contributors.delete(documentName);
|
||||
}
|
||||
|
||||
private consumeContributors(documentName: string): string[] {
|
||||
const contributorSet = this.contributors.get(documentName);
|
||||
if (!contributorSet) return [];
|
||||
const userIds = [...contributorSet];
|
||||
this.contributors.delete(documentName);
|
||||
return userIds;
|
||||
}
|
||||
|
||||
private async enqueuePageHistory(page: Page): Promise<void> {
|
||||
const pageAge = Date.now() - new Date(page.createdAt).getTime();
|
||||
const delay =
|
||||
|
||||
@@ -6,6 +6,7 @@ import { IPageHistoryJob } from '../../integrations/queue/constants/queue.interf
|
||||
import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo';
|
||||
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||
import { isDeepStrictEqual } from 'node:util';
|
||||
import { CollabHistoryService } from '../services/collab-history.service';
|
||||
|
||||
@Processor(QueueName.HISTORY_QUEUE)
|
||||
export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
@@ -14,6 +15,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
constructor(
|
||||
private readonly pageHistoryRepo: PageHistoryRepo,
|
||||
private readonly pageRepo: PageRepo,
|
||||
private readonly collabHistory: CollabHistoryService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
@@ -30,6 +32,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
|
||||
if (!page) {
|
||||
this.logger.warn(`Page ${pageId} not found, skipping history`);
|
||||
await this.collabHistory.clearContributors(pageId);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -42,8 +45,19 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
!lastHistory ||
|
||||
!isDeepStrictEqual(lastHistory.content, page.content)
|
||||
) {
|
||||
await this.pageHistoryRepo.saveHistory(page);
|
||||
const contributorIds =
|
||||
await this.collabHistory.popContributors(pageId);
|
||||
|
||||
try {
|
||||
await this.pageHistoryRepo.saveHistory(page, { contributorIds });
|
||||
this.logger.debug(`History created for page: ${pageId}`);
|
||||
} catch (err) {
|
||||
await this.collabHistory.addContributors(
|
||||
pageId,
|
||||
contributorIds,
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
throw err;
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||
import type { Redis } from 'ioredis';
|
||||
|
||||
const REDIS_KEY_PREFIX = 'history:contributors:';
|
||||
|
||||
@Injectable()
|
||||
export class CollabHistoryService {
|
||||
private readonly redis: Redis;
|
||||
|
||||
constructor(private readonly redisService: RedisService) {
|
||||
this.redis = this.redisService.getOrThrow();
|
||||
}
|
||||
|
||||
async addContributors(pageId: string, userIds: string[]): Promise<void> {
|
||||
if (userIds.length === 0) return;
|
||||
await this.redis.sadd(REDIS_KEY_PREFIX + pageId, ...userIds);
|
||||
}
|
||||
|
||||
async popContributors(pageId: string): Promise<string[]> {
|
||||
const key = REDIS_KEY_PREFIX + pageId;
|
||||
const count = await this.redis.scard(key);
|
||||
if (count === 0) return [];
|
||||
return await this.redis.spop(key, count);
|
||||
}
|
||||
|
||||
async clearContributors(pageId: string): Promise<void> {
|
||||
await this.redis.del(REDIS_KEY_PREFIX + pageId);
|
||||
}
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
import { type Kysely, sql } from 'kysely';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
await db.schema
|
||||
.alterTable('page_history')
|
||||
.addColumn('contributor_ids', sql`uuid[]`, (col) => col.defaultTo('{}'))
|
||||
.execute();
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await db.schema
|
||||
.alterTable('page_history')
|
||||
.dropColumn('contributor_ids')
|
||||
.execute();
|
||||
}
|
||||
@@ -9,8 +9,8 @@ import {
|
||||
} from '@docmost/db/types/entity.types';
|
||||
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
|
||||
import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagination';
|
||||
import { jsonObjectFrom } from 'kysely/helpers/postgres';
|
||||
import { ExpressionBuilder } from 'kysely';
|
||||
import { jsonArrayFrom, jsonObjectFrom } from 'kysely/helpers/postgres';
|
||||
import { ExpressionBuilder, sql } from 'kysely';
|
||||
import { DB } from '@docmost/db/types/db';
|
||||
|
||||
@Injectable()
|
||||
@@ -25,6 +25,7 @@ export class PageHistoryRepo {
|
||||
'icon',
|
||||
'coverPhoto',
|
||||
'lastUpdatedById',
|
||||
'contributorIds',
|
||||
'spaceId',
|
||||
'workspaceId',
|
||||
'createdAt',
|
||||
@@ -44,6 +45,7 @@ export class PageHistoryRepo {
|
||||
.select(this.baseFields)
|
||||
.$if(opts?.includeContent, (qb) => qb.select('content'))
|
||||
.select((eb) => this.withLastUpdatedBy(eb))
|
||||
.select((eb) => this.withContributors(eb))
|
||||
.where('id', '=', pageHistoryId)
|
||||
.executeTakeFirst();
|
||||
}
|
||||
@@ -60,7 +62,10 @@ export class PageHistoryRepo {
|
||||
.executeTakeFirst();
|
||||
}
|
||||
|
||||
async saveHistory(page: Page, trx?: KyselyTransaction): Promise<void> {
|
||||
async saveHistory(
|
||||
page: Page,
|
||||
opts?: { contributorIds?: string[]; trx?: KyselyTransaction },
|
||||
): Promise<void> {
|
||||
await this.insertPageHistory(
|
||||
{
|
||||
pageId: page.id,
|
||||
@@ -70,10 +75,11 @@ export class PageHistoryRepo {
|
||||
icon: page.icon,
|
||||
coverPhoto: page.coverPhoto,
|
||||
lastUpdatedById: page.lastUpdatedById ?? page.creatorId,
|
||||
contributorIds: opts?.contributorIds,
|
||||
spaceId: page.spaceId,
|
||||
workspaceId: page.workspaceId,
|
||||
},
|
||||
trx,
|
||||
opts?.trx,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -82,6 +88,7 @@ export class PageHistoryRepo {
|
||||
.selectFrom('pageHistory')
|
||||
.select(this.baseFields)
|
||||
.select((eb) => this.withLastUpdatedBy(eb))
|
||||
.select((eb) => this.withContributors(eb))
|
||||
.where('pageId', '=', pageId);
|
||||
|
||||
return executeWithCursorPagination(query, {
|
||||
@@ -120,4 +127,17 @@ export class PageHistoryRepo {
|
||||
.whereRef('users.id', '=', 'pageHistory.lastUpdatedById'),
|
||||
).as('lastUpdatedBy');
|
||||
}
|
||||
|
||||
withContributors(eb: ExpressionBuilder<DB, 'pageHistory'>) {
|
||||
return jsonArrayFrom(
|
||||
eb
|
||||
.selectFrom('users')
|
||||
.select(['users.id', 'users.name', 'users.avatarUrl'])
|
||||
.whereRef(
|
||||
'users.id',
|
||||
'=',
|
||||
sql`ANY(${eb.ref('pageHistory.contributorIds')})`,
|
||||
),
|
||||
).as('contributors');
|
||||
}
|
||||
}
|
||||
|
||||
+1
@@ -199,6 +199,7 @@ export interface GroupUsers {
|
||||
|
||||
export interface PageHistory {
|
||||
content: Json | null;
|
||||
contributorIds: Generated<string[] | null>;
|
||||
coverPhoto: string | null;
|
||||
createdAt: Generated<Timestamp>;
|
||||
icon: string | null;
|
||||
|
||||
Reference in New Issue
Block a user