mirror of
https://github.com/docmost/docmost.git
synced 2026-06-11 02:36:56 +08:00
Merge branch 'main' into perm-x
This commit is contained in:
@@ -7,9 +7,10 @@ import { CollabWsAdapter } from './adapter/collab-ws.adapter';
|
||||
import { IncomingMessage } from 'http';
|
||||
import { WebSocket } from 'ws';
|
||||
import { TokenModule } from '../core/auth/token.module';
|
||||
import { HistoryListener } from './listeners/history.listener';
|
||||
import { HistoryProcessor } from './processors/history.processor';
|
||||
import { LoggerExtension } from './extensions/logger.extension';
|
||||
import { CollaborationHandler } from './collaboration.handler';
|
||||
import { CollabHistoryService } from './services/collab-history.service';
|
||||
|
||||
@Module({
|
||||
providers: [
|
||||
@@ -17,7 +18,8 @@ import { CollaborationHandler } from './collaboration.handler';
|
||||
AuthenticationExtension,
|
||||
PersistenceExtension,
|
||||
LoggerExtension,
|
||||
HistoryListener,
|
||||
HistoryProcessor,
|
||||
CollabHistoryService,
|
||||
CollaborationHandler,
|
||||
],
|
||||
exports: [CollaborationGateway],
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
export const HISTORY_INTERVAL = 5 * 60 * 1000;
|
||||
export const HISTORY_FAST_INTERVAL = 60 * 1000;
|
||||
export const HISTORY_FAST_THRESHOLD = 5 * 60 * 1000;
|
||||
@@ -13,7 +13,6 @@ import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
||||
import { executeTx } from '@docmost/db/utils';
|
||||
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||
import { InjectQueue } from '@nestjs/bullmq';
|
||||
import { QueueJob, QueueName } from '../../integrations/queue/constants';
|
||||
import { Queue } from 'bullmq';
|
||||
@@ -22,8 +21,17 @@ import {
|
||||
extractPageMentions,
|
||||
} from '../../common/helpers/prosemirror/utils';
|
||||
import { isDeepStrictEqual } from 'node:util';
|
||||
import { IPageBacklinkJob } from '../../integrations/queue/constants/queue.interface';
|
||||
import {
|
||||
IPageBacklinkJob,
|
||||
IPageHistoryJob,
|
||||
} from '../../integrations/queue/constants/queue.interface';
|
||||
import { Page } from '@docmost/db/types/entity.types';
|
||||
import { CollabHistoryService } from '../services/collab-history.service';
|
||||
import {
|
||||
HISTORY_FAST_INTERVAL,
|
||||
HISTORY_FAST_THRESHOLD,
|
||||
HISTORY_INTERVAL,
|
||||
} from '../constants';
|
||||
|
||||
@Injectable()
|
||||
export class PersistenceExtension implements Extension {
|
||||
@@ -33,9 +41,10 @@ export class PersistenceExtension implements Extension {
|
||||
constructor(
|
||||
private readonly pageRepo: PageRepo,
|
||||
@InjectKysely() private readonly db: KyselyDB,
|
||||
private eventEmitter: EventEmitter2,
|
||||
@InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue,
|
||||
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
|
||||
@InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue,
|
||||
private readonly collabHistory: CollabHistoryService,
|
||||
) {}
|
||||
|
||||
async onLoadDocument(data: onLoadDocumentPayload) {
|
||||
@@ -101,6 +110,7 @@ export class PersistenceExtension implements Extension {
|
||||
}
|
||||
|
||||
let page: Page = null;
|
||||
const editingUserIds = this.consumeContributors(documentName);
|
||||
|
||||
try {
|
||||
await executeTx(this.db, async (trx) => {
|
||||
@@ -123,13 +133,9 @@ export class PersistenceExtension implements Extension {
|
||||
let contributorIds = undefined;
|
||||
try {
|
||||
const existingContributors = page.contributorIds || [];
|
||||
const contributorSet = this.contributors.get(documentName);
|
||||
contributorSet.add(page.creatorId);
|
||||
const newContributors = [...contributorSet];
|
||||
contributorIds = Array.from(
|
||||
new Set([...existingContributors, ...newContributors]),
|
||||
new Set([...existingContributors, ...editingUserIds, page.creatorId]),
|
||||
);
|
||||
this.contributors.delete(documentName);
|
||||
} catch (err) {
|
||||
//this.logger.debug('Contributors error:' + err?.['message']);
|
||||
}
|
||||
@@ -153,13 +159,7 @@ export class PersistenceExtension implements Extension {
|
||||
}
|
||||
|
||||
if (page) {
|
||||
this.eventEmitter.emit('collab.page.updated', {
|
||||
page: {
|
||||
...page,
|
||||
content: tiptapJson,
|
||||
lastUpdatedById: context.user.id,
|
||||
},
|
||||
});
|
||||
await this.collabHistory.addContributors(pageId, editingUserIds);
|
||||
|
||||
const mentions = extractMentions(tiptapJson);
|
||||
const pageMentions = extractPageMentions(mentions);
|
||||
@@ -174,6 +174,8 @@ export class PersistenceExtension implements Extension {
|
||||
pageIds: [pageId],
|
||||
workspaceId: page.workspaceId,
|
||||
});
|
||||
|
||||
await this.enqueuePageHistory(page);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,4 +195,26 @@ export class PersistenceExtension implements Extension {
|
||||
const documentName = data.documentName;
|
||||
this.contributors.delete(documentName);
|
||||
}
|
||||
|
||||
private consumeContributors(documentName: string): string[] {
|
||||
const contributorSet = this.contributors.get(documentName);
|
||||
if (!contributorSet) return [];
|
||||
const userIds = [...contributorSet];
|
||||
this.contributors.delete(documentName);
|
||||
return userIds;
|
||||
}
|
||||
|
||||
private async enqueuePageHistory(page: Page): Promise<void> {
|
||||
const pageAge = Date.now() - new Date(page.createdAt).getTime();
|
||||
const delay =
|
||||
pageAge < HISTORY_FAST_THRESHOLD
|
||||
? HISTORY_FAST_INTERVAL
|
||||
: HISTORY_INTERVAL;
|
||||
|
||||
await this.historyQueue.add(
|
||||
QueueJob.PAGE_HISTORY,
|
||||
{ pageId: page.id } as IPageHistoryJob,
|
||||
{ jobId: page.id, delay },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { OnEvent } from '@nestjs/event-emitter';
|
||||
import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo';
|
||||
import { Page } from '@docmost/db/types/entity.types';
|
||||
import { isDeepStrictEqual } from 'node:util';
|
||||
|
||||
export class UpdatedPageEvent {
|
||||
page: Page;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class HistoryListener {
|
||||
private readonly logger = new Logger(HistoryListener.name);
|
||||
|
||||
constructor(private readonly pageHistoryRepo: PageHistoryRepo) {}
|
||||
|
||||
@OnEvent('collab.page.updated')
|
||||
async handleCreatePageHistory(event: UpdatedPageEvent) {
|
||||
const { page } = event;
|
||||
|
||||
const pageCreationTime = new Date(page.createdAt).getTime();
|
||||
const currentTime = Date.now();
|
||||
const FIVE_MINUTES = 5 * 60 * 1000;
|
||||
|
||||
if (currentTime - pageCreationTime < FIVE_MINUTES) {
|
||||
return;
|
||||
}
|
||||
|
||||
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(page.id);
|
||||
|
||||
if (
|
||||
!lastHistory ||
|
||||
(!isDeepStrictEqual(lastHistory.content, page.content) &&
|
||||
currentTime - new Date(lastHistory.createdAt).getTime() >= FIVE_MINUTES)
|
||||
) {
|
||||
try {
|
||||
await this.pageHistoryRepo.saveHistory(page);
|
||||
this.logger.debug(`New history created for: ${page.id}`);
|
||||
} catch (err) {
|
||||
this.logger.error(`Failed to create history for page: ${page.id}`, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
import { Logger, OnModuleDestroy } from '@nestjs/common';
|
||||
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { Job } from 'bullmq';
|
||||
import { QueueJob, QueueName } from '../../integrations/queue/constants';
|
||||
import { IPageHistoryJob } from '../../integrations/queue/constants/queue.interface';
|
||||
import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo';
|
||||
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||
import { isDeepStrictEqual } from 'node:util';
|
||||
import { CollabHistoryService } from '../services/collab-history.service';
|
||||
|
||||
@Processor(QueueName.HISTORY_QUEUE)
|
||||
export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
private readonly logger = new Logger(HistoryProcessor.name);
|
||||
|
||||
constructor(
|
||||
private readonly pageHistoryRepo: PageHistoryRepo,
|
||||
private readonly pageRepo: PageRepo,
|
||||
private readonly collabHistory: CollabHistoryService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async process(job: Job<IPageHistoryJob, void>): Promise<void> {
|
||||
if (job.name !== QueueJob.PAGE_HISTORY) return;
|
||||
|
||||
try {
|
||||
const { pageId } = job.data;
|
||||
|
||||
const page = await this.pageRepo.findById(pageId, {
|
||||
includeContent: true,
|
||||
});
|
||||
|
||||
if (!page) {
|
||||
this.logger.warn(`Page ${pageId} not found, skipping history`);
|
||||
await this.collabHistory.clearContributors(pageId);
|
||||
return;
|
||||
}
|
||||
|
||||
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
|
||||
pageId,
|
||||
{ includeContent: true },
|
||||
);
|
||||
|
||||
if (
|
||||
!lastHistory ||
|
||||
!isDeepStrictEqual(lastHistory.content, page.content)
|
||||
) {
|
||||
const contributorIds =
|
||||
await this.collabHistory.popContributors(pageId);
|
||||
|
||||
try {
|
||||
await this.pageHistoryRepo.saveHistory(page, { contributorIds });
|
||||
this.logger.debug(`History created for page: ${pageId}`);
|
||||
} catch (err) {
|
||||
await this.collabHistory.addContributors(
|
||||
pageId,
|
||||
contributorIds,
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
@OnWorkerEvent('active')
|
||||
onActive(job: Job) {
|
||||
this.logger.debug(`Processing ${job.name} for page: ${job.data.pageId}`);
|
||||
}
|
||||
|
||||
@OnWorkerEvent('failed')
|
||||
onError(job: Job) {
|
||||
this.logger.error(
|
||||
`Failed ${job.name} for page: ${job.data.pageId}. Reason: ${job.failedReason}`,
|
||||
);
|
||||
}
|
||||
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
if (this.worker) {
|
||||
await this.worker.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,8 @@ import { EventEmitterModule } from '@nestjs/event-emitter';
|
||||
import { HealthModule } from '../../integrations/health/health.module';
|
||||
import { CollaborationController } from './collaboration.controller';
|
||||
import { LoggerModule } from '../../common/logger/logger.module';
|
||||
import { RedisModule } from '@nestjs-labs/nestjs-ioredis';
|
||||
import { RedisConfigService } from '../../integrations/redis/redis-config.service';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@@ -19,6 +21,9 @@ import { LoggerModule } from '../../common/logger/logger.module';
|
||||
QueueModule,
|
||||
HealthModule,
|
||||
EventEmitterModule.forRoot(),
|
||||
RedisModule.forRootAsync({
|
||||
useClass: RedisConfigService,
|
||||
}),
|
||||
],
|
||||
controllers: [
|
||||
AppController,
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||
import type { Redis } from 'ioredis';
|
||||
|
||||
const REDIS_KEY_PREFIX = 'history:contributors:';
|
||||
|
||||
@Injectable()
|
||||
export class CollabHistoryService {
|
||||
private readonly redis: Redis;
|
||||
|
||||
constructor(private readonly redisService: RedisService) {
|
||||
this.redis = this.redisService.getOrThrow();
|
||||
}
|
||||
|
||||
async addContributors(pageId: string, userIds: string[]): Promise<void> {
|
||||
if (userIds.length === 0) return;
|
||||
await this.redis.sadd(REDIS_KEY_PREFIX + pageId, ...userIds);
|
||||
}
|
||||
|
||||
async popContributors(pageId: string): Promise<string[]> {
|
||||
const key = REDIS_KEY_PREFIX + pageId;
|
||||
const count = await this.redis.scard(key);
|
||||
if (count === 0) return [];
|
||||
return await this.redis.spop(key, count);
|
||||
}
|
||||
|
||||
async clearContributors(pageId: string): Promise<void> {
|
||||
await this.redis.del(REDIS_KEY_PREFIX + pageId);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user