diff --git a/apps/server/src/core/page/page.module.ts b/apps/server/src/core/page/page.module.ts index 2ea9f905..05384554 100644 --- a/apps/server/src/core/page/page.module.ts +++ b/apps/server/src/core/page/page.module.ts @@ -4,13 +4,25 @@ import { PageController } from './page.controller'; import { PageHistoryService } from './services/page-history.service'; import { TrashCleanupService } from './services/trash-cleanup.service'; import { PagePermissionService } from './services/page-permission.service'; +import { PageHierarchyService } from './services/page-hierarchy.service'; import { PagePermissionController } from './page-permission.controller'; import { StorageModule } from '../../integrations/storage/storage.module'; @Module({ controllers: [PageController, PagePermissionController], - providers: [PageService, PageHistoryService, TrashCleanupService, PagePermissionService], - exports: [PageService, PageHistoryService, PagePermissionService], + providers: [ + PageService, + PageHistoryService, + TrashCleanupService, + PagePermissionService, + PageHierarchyService, + ], + exports: [ + PageService, + PageHistoryService, + PagePermissionService, + PageHierarchyService, + ], imports: [StorageModule], }) export class PageModule {} diff --git a/apps/server/src/core/page/services/page-hierarchy.service.ts b/apps/server/src/core/page/services/page-hierarchy.service.ts new file mode 100644 index 00000000..a42ceec1 --- /dev/null +++ b/apps/server/src/core/page/services/page-hierarchy.service.ts @@ -0,0 +1,56 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { PageHierarchyRepo } from '@docmost/db/repos/page/page-hierarchy.repo'; +import { executeTx } from '@docmost/db/utils'; + +type RebuildResult = { rebuilt: boolean; count: number }; + +@Injectable() +export class PageHierarchyService { + private readonly logger = new Logger(PageHierarchyService.name); + + constructor( + @InjectKysely() private readonly db: KyselyDB, + private readonly pageHierarchyRepo: PageHierarchyRepo, + ) {} + + async rebuildAll(): Promise { + return executeTx(this.db, async (trx) => { + const locked = await this.pageHierarchyRepo.tryAcquireGlobalLock(trx); + if (!locked) { + this.logger.debug('Rebuild all skipped - another process holds the lock'); + return { rebuilt: false, count: 0 }; + } + + this.logger.log('Rebuilding hierarchy for all pages'); + const count = await this.pageHierarchyRepo.rebuildAll(trx); + this.logger.log(`Rebuilt hierarchy for all pages (${count} entries)`); + + return { rebuilt: true, count }; + }); + } + + async rebuildBySpace(spaceId: string): Promise { + return executeTx(this.db, async (trx) => { + const locked = await this.pageHierarchyRepo.tryAcquireSpaceLock( + spaceId, + trx, + ); + if (!locked) { + this.logger.debug( + `Rebuild for space ${spaceId} skipped - another process holds the lock`, + ); + return { rebuilt: false, count: 0 }; + } + + this.logger.log(`Rebuilding hierarchy for space ${spaceId}`); + const count = await this.pageHierarchyRepo.rebuildBySpace(spaceId, trx); + this.logger.log( + `Rebuilt hierarchy for space ${spaceId} (${count} entries)`, + ); + + return { rebuilt: true, count }; + }); + } +} diff --git a/apps/server/src/database/database.module.ts b/apps/server/src/database/database.module.ts index ea346dd1..f0553ffa 100644 --- a/apps/server/src/database/database.module.ts +++ b/apps/server/src/database/database.module.ts @@ -17,6 +17,7 @@ import { SpaceRepo } from '@docmost/db/repos/space/space.repo'; import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo'; import { PageRepo } from './repos/page/page.repo'; import { PagePermissionRepo } from './repos/page/page-permission.repo'; +import { PageHierarchyRepo } from './repos/page/page-hierarchy.repo'; import { CommentRepo } from './repos/comment/comment.repo'; import { PageHistoryRepo } from './repos/page/page-history.repo'; import { AttachmentRepo } from './repos/attachment/attachment.repo'; @@ -73,6 +74,7 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val)); SpaceMemberRepo, PageRepo, PagePermissionRepo, + PageHierarchyRepo, PageHistoryRepo, CommentRepo, AttachmentRepo, @@ -90,6 +92,7 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val)); SpaceMemberRepo, PageRepo, PagePermissionRepo, + PageHierarchyRepo, PageHistoryRepo, CommentRepo, AttachmentRepo, diff --git a/apps/server/src/database/repos/page/page-hierarchy.repo.ts b/apps/server/src/database/repos/page/page-hierarchy.repo.ts new file mode 100644 index 00000000..49cfd847 --- /dev/null +++ b/apps/server/src/database/repos/page/page-hierarchy.repo.ts @@ -0,0 +1,114 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB, KyselyTransaction } from '../../types/kysely.types'; +import { sql } from 'kysely'; + +@Injectable() +export class PageHierarchyRepo { + private readonly logger = new Logger(PageHierarchyRepo.name); + + constructor(@InjectKysely() private readonly db: KyselyDB) {} + + async tryAcquireGlobalLock(trx: KyselyTransaction): Promise { + const result = await sql<{ locked: boolean }>` + SELECT pg_try_advisory_xact_lock(hashtext('rebuild_page_hierarchy_global')) as locked + `.execute(trx); + return result.rows[0]?.locked ?? false; + } + + async tryAcquireSpaceLock( + spaceId: string, + trx: KyselyTransaction, + ): Promise { + const result = await sql<{ locked: boolean }>` + SELECT pg_try_advisory_xact_lock(hashtext(${'rebuild_page_hierarchy_space_' + spaceId})) as locked + `.execute(trx); + return result.rows[0]?.locked ?? false; + } + + async rebuildAll(trx: KyselyTransaction): Promise { + await trx.deleteFrom('pageHierarchy').execute(); + + const result = await trx + .withRecursive('pageTree', (qb) => + qb + .selectFrom('pages') + .select([ + 'id as ancestorId', + 'id as descendantId', + sql`0`.as('depth'), + ]) + .where('deletedAt', 'is', null) + .unionAll((eb) => + eb + .selectFrom('pages as p') + .innerJoin('pageTree as pt', 'p.parentPageId', 'pt.descendantId') + .select([ + 'pt.ancestorId', + 'p.id as descendantId', + sql`pt.depth + 1`.as('depth'), + ]) + .where('p.deletedAt', 'is', null), + ), + ) + .insertInto('pageHierarchy') + .columns(['ancestorId', 'descendantId', 'depth']) + .expression((eb) => + eb + .selectFrom('pageTree') + .select(['ancestorId', 'descendantId', 'depth']), + ) + .executeTakeFirst(); + + return Number(result?.numInsertedOrUpdatedRows ?? 0); + } + + async rebuildBySpace( + spaceId: string, + trx: KyselyTransaction, + ): Promise { + await trx + .deleteFrom('pageHierarchy') + .where( + 'descendantId', + 'in', + trx.selectFrom('pages').select('id').where('spaceId', '=', spaceId), + ) + .execute(); + + const result = await trx + .withRecursive('pageTree', (qb) => + qb + .selectFrom('pages') + .select([ + 'id as ancestorId', + 'id as descendantId', + sql`0`.as('depth'), + ]) + .where('spaceId', '=', spaceId) + .where('deletedAt', 'is', null) + .unionAll((eb) => + eb + .selectFrom('pages as p') + .innerJoin('pageTree as pt', 'p.parentPageId', 'pt.descendantId') + .select([ + 'pt.ancestorId', + 'p.id as descendantId', + sql`pt.depth + 1`.as('depth'), + ]) + .where('p.spaceId', '=', spaceId) + .where('p.deletedAt', 'is', null), + ), + ) + .insertInto('pageHierarchy') + .columns(['ancestorId', 'descendantId', 'depth']) + .expression((eb) => + eb + .selectFrom('pageTree') + .select(['ancestorId', 'descendantId', 'depth']), + ) + .executeTakeFirst(); + + return Number(result?.numInsertedOrUpdatedRows ?? 0); + } +} diff --git a/apps/server/src/integrations/queue/constants/queue.constants.ts b/apps/server/src/integrations/queue/constants/queue.constants.ts index 5c7aa29a..523d6bfd 100644 --- a/apps/server/src/integrations/queue/constants/queue.constants.ts +++ b/apps/server/src/integrations/queue/constants/queue.constants.ts @@ -6,6 +6,7 @@ export enum QueueName { FILE_TASK_QUEUE = '{file-task-queue}', SEARCH_QUEUE = '{search-queue}', AI_QUEUE = '{ai-queue}', + HIERARCHY_QUEUE = '{hierarchy-queue}', } export enum QueueJob { @@ -58,4 +59,8 @@ export enum QueueJob { GENERATE_PAGE_EMBEDDINGS = 'generate-page-embeddings', DELETE_PAGE_EMBEDDINGS = 'delete-page-embeddings', + + REBUILD_HIERARCHY_ALL = 'rebuild-hierarchy-all', + REBUILD_HIERARCHY_ALL_BY_SPACE = 'rebuild-hierarchy-all-by-space', + REBUILD_HIERARCHY_SPACE = 'rebuild-hierarchy-space', } diff --git a/apps/server/src/integrations/queue/constants/queue.interface.ts b/apps/server/src/integrations/queue/constants/queue.interface.ts index ce105f1c..593cac7e 100644 --- a/apps/server/src/integrations/queue/constants/queue.interface.ts +++ b/apps/server/src/integrations/queue/constants/queue.interface.ts @@ -9,4 +9,8 @@ export interface IPageBacklinkJob { export interface IStripeSeatsSyncJob { workspaceId: string; +} + +export interface IRebuildHierarchyJob { + spaceId?: string; } \ No newline at end of file diff --git a/apps/server/src/integrations/queue/processors/hierarchy.processor.ts b/apps/server/src/integrations/queue/processors/hierarchy.processor.ts new file mode 100644 index 00000000..590e04df --- /dev/null +++ b/apps/server/src/integrations/queue/processors/hierarchy.processor.ts @@ -0,0 +1,146 @@ +import { Logger, OnModuleDestroy } from '@nestjs/common'; +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'; +import { Job } from 'bullmq'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { QueueJob, QueueName } from '../constants'; +import { IRebuildHierarchyJob } from '../constants/queue.interface'; +import { PageHierarchyRepo } from '@docmost/db/repos/page/page-hierarchy.repo'; +import { executeTx } from '@docmost/db/utils'; + +const HIERARCHY_JOBS = [ + QueueJob.REBUILD_HIERARCHY_ALL, + QueueJob.REBUILD_HIERARCHY_ALL_BY_SPACE, + QueueJob.REBUILD_HIERARCHY_SPACE, +] as const; + +@Processor(QueueName.HIERARCHY_QUEUE) +export class HierarchyProcessor extends WorkerHost implements OnModuleDestroy { + private readonly logger = new Logger(HierarchyProcessor.name); + + constructor( + @InjectKysely() private readonly db: KyselyDB, + private readonly pageHierarchyRepo: PageHierarchyRepo, + ) { + super(); + } + + async process(job: Job): Promise { + try { + switch (job.name) { + case QueueJob.REBUILD_HIERARCHY_ALL: + await this.rebuildAll(); + break; + + case QueueJob.REBUILD_HIERARCHY_ALL_BY_SPACE: + await this.rebuildAllBySpace(); + break; + + case QueueJob.REBUILD_HIERARCHY_SPACE: + if (!job.data.spaceId) { + throw new Error('spaceId is required for space rebuild'); + } + await this.rebuildBySpace(job.data.spaceId); + break; + } + } catch (err) { + throw err; + } + } + + private async rebuildAll(): Promise { + await executeTx(this.db, async (trx) => { + const locked = await this.pageHierarchyRepo.tryAcquireGlobalLock(trx); + if (!locked) { + this.logger.debug( + 'Rebuild all skipped - another process holds the lock', + ); + return; + } + + this.logger.debug('Rebuilding hierarchy for all pages'); + const count = await this.pageHierarchyRepo.rebuildAll(trx); + this.logger.debug(`Rebuilt hierarchy for all pages (${count} entries)`); + }); + } + + private async rebuildBySpace(spaceId: string): Promise { + await executeTx(this.db, async (trx) => { + const locked = await this.pageHierarchyRepo.tryAcquireSpaceLock( + spaceId, + trx, + ); + if (!locked) { + this.logger.debug( + `Rebuild for space ${spaceId} skipped - another process holds the lock`, + ); + return; + } + + this.logger.debug(`Rebuilding hierarchy for space ${spaceId}`); + const count = await this.pageHierarchyRepo.rebuildBySpace(spaceId, trx); + this.logger.debug( + `Rebuilt hierarchy for space ${spaceId} (${count} entries)`, + ); + }); + } + + private async rebuildAllBySpace(): Promise { + let lastId: string | null = null; + const BATCH_SIZE = 100; + let totalSpaces = 0; + + this.logger.debug('Starting hierarchy rebuild for all spaces'); + + while (true) { + const spaces = await this.db + .selectFrom('spaces') + .select('id') + .$if(Boolean(lastId), (qb) => qb.where('id', '>', lastId!)) + .orderBy('id', 'asc') + .limit(BATCH_SIZE) + .execute(); + + if (spaces.length === 0) break; + + for (const space of spaces) { + await this.rebuildBySpace(space.id); + totalSpaces++; + } + + lastId = spaces[spaces.length - 1].id; + this.logger.debug(`Rebuilt hierarchy for ${totalSpaces} spaces...`); + } + + this.logger.debug(`Completed hierarchy rebuild for ${totalSpaces} spaces`); + } + + @OnWorkerEvent('active') + onActive(job: Job) { + if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) { + this.logger.debug(`Processing ${job.name} job`); + } + } + + @OnWorkerEvent('failed') + onError(job: Job) { + if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) { + this.logger.error( + `Error processing ${job.name} job. Reason: ${job.failedReason}`, + ); + } + } + + @OnWorkerEvent('completed') + onCompleted(job: Job) { + if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) { + this.logger.debug(`Completed ${job.name} job`); + } + } + + async onModuleDestroy(): Promise { + if (this.worker) { + await this.worker.close(); + } + } +} diff --git a/apps/server/src/integrations/queue/queue.module.ts b/apps/server/src/integrations/queue/queue.module.ts index 6787e010..5454b86d 100644 --- a/apps/server/src/integrations/queue/queue.module.ts +++ b/apps/server/src/integrations/queue/queue.module.ts @@ -4,6 +4,7 @@ import { EnvironmentService } from '../environment/environment.service'; import { createRetryStrategy, parseRedisUrl } from '../../common/helpers'; import { QueueName } from './constants'; import { BacklinksProcessor } from './processors/backlinks.processor'; +import { HierarchyProcessor } from './processors/hierarchy.processor'; @Global() @Module({ @@ -73,8 +74,22 @@ import { BacklinksProcessor } from './processors/backlinks.processor'; attempts: 1, }, }), + BullModule.registerQueue({ + name: QueueName.HIERARCHY_QUEUE, + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: { + count: 50, + }, + attempts: 3, + backoff: { + type: 'exponential', + delay: 20 * 1000, + }, + }, + }), ], exports: [BullModule], - providers: [BacklinksProcessor], + providers: [BacklinksProcessor, HierarchyProcessor], }) export class QueueModule {}