WIP 6 - rebuilding

This commit is contained in:
Philipinho
2026-01-04 17:59:46 +00:00
parent 29658b0572
commit 8d9aa3b3aa
8 changed files with 358 additions and 3 deletions
+14 -2
View File
@@ -4,13 +4,25 @@ import { PageController } from './page.controller';
import { PageHistoryService } from './services/page-history.service';
import { TrashCleanupService } from './services/trash-cleanup.service';
import { PagePermissionService } from './services/page-permission.service';
import { PageHierarchyService } from './services/page-hierarchy.service';
import { PagePermissionController } from './page-permission.controller';
import { StorageModule } from '../../integrations/storage/storage.module';
@Module({
controllers: [PageController, PagePermissionController],
providers: [PageService, PageHistoryService, TrashCleanupService, PagePermissionService],
exports: [PageService, PageHistoryService, PagePermissionService],
providers: [
PageService,
PageHistoryService,
TrashCleanupService,
PagePermissionService,
PageHierarchyService,
],
exports: [
PageService,
PageHistoryService,
PagePermissionService,
PageHierarchyService,
],
imports: [StorageModule],
})
export class PageModule {}
@@ -0,0 +1,56 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { PageHierarchyRepo } from '@docmost/db/repos/page/page-hierarchy.repo';
import { executeTx } from '@docmost/db/utils';
type RebuildResult = { rebuilt: boolean; count: number };
@Injectable()
export class PageHierarchyService {
private readonly logger = new Logger(PageHierarchyService.name);
constructor(
@InjectKysely() private readonly db: KyselyDB,
private readonly pageHierarchyRepo: PageHierarchyRepo,
) {}
async rebuildAll(): Promise<RebuildResult> {
return executeTx(this.db, async (trx) => {
const locked = await this.pageHierarchyRepo.tryAcquireGlobalLock(trx);
if (!locked) {
this.logger.debug('Rebuild all skipped - another process holds the lock');
return { rebuilt: false, count: 0 };
}
this.logger.log('Rebuilding hierarchy for all pages');
const count = await this.pageHierarchyRepo.rebuildAll(trx);
this.logger.log(`Rebuilt hierarchy for all pages (${count} entries)`);
return { rebuilt: true, count };
});
}
async rebuildBySpace(spaceId: string): Promise<RebuildResult> {
return executeTx(this.db, async (trx) => {
const locked = await this.pageHierarchyRepo.tryAcquireSpaceLock(
spaceId,
trx,
);
if (!locked) {
this.logger.debug(
`Rebuild for space ${spaceId} skipped - another process holds the lock`,
);
return { rebuilt: false, count: 0 };
}
this.logger.log(`Rebuilding hierarchy for space ${spaceId}`);
const count = await this.pageHierarchyRepo.rebuildBySpace(spaceId, trx);
this.logger.log(
`Rebuilt hierarchy for space ${spaceId} (${count} entries)`,
);
return { rebuilt: true, count };
});
}
}
@@ -17,6 +17,7 @@ import { SpaceRepo } from '@docmost/db/repos/space/space.repo';
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
import { PageRepo } from './repos/page/page.repo';
import { PagePermissionRepo } from './repos/page/page-permission.repo';
import { PageHierarchyRepo } from './repos/page/page-hierarchy.repo';
import { CommentRepo } from './repos/comment/comment.repo';
import { PageHistoryRepo } from './repos/page/page-history.repo';
import { AttachmentRepo } from './repos/attachment/attachment.repo';
@@ -73,6 +74,7 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val));
SpaceMemberRepo,
PageRepo,
PagePermissionRepo,
PageHierarchyRepo,
PageHistoryRepo,
CommentRepo,
AttachmentRepo,
@@ -90,6 +92,7 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val));
SpaceMemberRepo,
PageRepo,
PagePermissionRepo,
PageHierarchyRepo,
PageHistoryRepo,
CommentRepo,
AttachmentRepo,
@@ -0,0 +1,114 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB, KyselyTransaction } from '../../types/kysely.types';
import { sql } from 'kysely';
@Injectable()
export class PageHierarchyRepo {
private readonly logger = new Logger(PageHierarchyRepo.name);
constructor(@InjectKysely() private readonly db: KyselyDB) {}
async tryAcquireGlobalLock(trx: KyselyTransaction): Promise<boolean> {
const result = await sql<{ locked: boolean }>`
SELECT pg_try_advisory_xact_lock(hashtext('rebuild_page_hierarchy_global')) as locked
`.execute(trx);
return result.rows[0]?.locked ?? false;
}
async tryAcquireSpaceLock(
spaceId: string,
trx: KyselyTransaction,
): Promise<boolean> {
const result = await sql<{ locked: boolean }>`
SELECT pg_try_advisory_xact_lock(hashtext(${'rebuild_page_hierarchy_space_' + spaceId})) as locked
`.execute(trx);
return result.rows[0]?.locked ?? false;
}
async rebuildAll(trx: KyselyTransaction): Promise<number> {
await trx.deleteFrom('pageHierarchy').execute();
const result = await trx
.withRecursive('pageTree', (qb) =>
qb
.selectFrom('pages')
.select([
'id as ancestorId',
'id as descendantId',
sql<number>`0`.as('depth'),
])
.where('deletedAt', 'is', null)
.unionAll((eb) =>
eb
.selectFrom('pages as p')
.innerJoin('pageTree as pt', 'p.parentPageId', 'pt.descendantId')
.select([
'pt.ancestorId',
'p.id as descendantId',
sql<number>`pt.depth + 1`.as('depth'),
])
.where('p.deletedAt', 'is', null),
),
)
.insertInto('pageHierarchy')
.columns(['ancestorId', 'descendantId', 'depth'])
.expression((eb) =>
eb
.selectFrom('pageTree')
.select(['ancestorId', 'descendantId', 'depth']),
)
.executeTakeFirst();
return Number(result?.numInsertedOrUpdatedRows ?? 0);
}
async rebuildBySpace(
spaceId: string,
trx: KyselyTransaction,
): Promise<number> {
await trx
.deleteFrom('pageHierarchy')
.where(
'descendantId',
'in',
trx.selectFrom('pages').select('id').where('spaceId', '=', spaceId),
)
.execute();
const result = await trx
.withRecursive('pageTree', (qb) =>
qb
.selectFrom('pages')
.select([
'id as ancestorId',
'id as descendantId',
sql<number>`0`.as('depth'),
])
.where('spaceId', '=', spaceId)
.where('deletedAt', 'is', null)
.unionAll((eb) =>
eb
.selectFrom('pages as p')
.innerJoin('pageTree as pt', 'p.parentPageId', 'pt.descendantId')
.select([
'pt.ancestorId',
'p.id as descendantId',
sql<number>`pt.depth + 1`.as('depth'),
])
.where('p.spaceId', '=', spaceId)
.where('p.deletedAt', 'is', null),
),
)
.insertInto('pageHierarchy')
.columns(['ancestorId', 'descendantId', 'depth'])
.expression((eb) =>
eb
.selectFrom('pageTree')
.select(['ancestorId', 'descendantId', 'depth']),
)
.executeTakeFirst();
return Number(result?.numInsertedOrUpdatedRows ?? 0);
}
}
@@ -6,6 +6,7 @@ export enum QueueName {
FILE_TASK_QUEUE = '{file-task-queue}',
SEARCH_QUEUE = '{search-queue}',
AI_QUEUE = '{ai-queue}',
HIERARCHY_QUEUE = '{hierarchy-queue}',
}
export enum QueueJob {
@@ -58,4 +59,8 @@ export enum QueueJob {
GENERATE_PAGE_EMBEDDINGS = 'generate-page-embeddings',
DELETE_PAGE_EMBEDDINGS = 'delete-page-embeddings',
REBUILD_HIERARCHY_ALL = 'rebuild-hierarchy-all',
REBUILD_HIERARCHY_ALL_BY_SPACE = 'rebuild-hierarchy-all-by-space',
REBUILD_HIERARCHY_SPACE = 'rebuild-hierarchy-space',
}
@@ -9,4 +9,8 @@ export interface IPageBacklinkJob {
export interface IStripeSeatsSyncJob {
workspaceId: string;
}
export interface IRebuildHierarchyJob {
spaceId?: string;
}
@@ -0,0 +1,146 @@
import { Logger, OnModuleDestroy } from '@nestjs/common';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { QueueJob, QueueName } from '../constants';
import { IRebuildHierarchyJob } from '../constants/queue.interface';
import { PageHierarchyRepo } from '@docmost/db/repos/page/page-hierarchy.repo';
import { executeTx } from '@docmost/db/utils';
const HIERARCHY_JOBS = [
QueueJob.REBUILD_HIERARCHY_ALL,
QueueJob.REBUILD_HIERARCHY_ALL_BY_SPACE,
QueueJob.REBUILD_HIERARCHY_SPACE,
] as const;
@Processor(QueueName.HIERARCHY_QUEUE)
export class HierarchyProcessor extends WorkerHost implements OnModuleDestroy {
private readonly logger = new Logger(HierarchyProcessor.name);
constructor(
@InjectKysely() private readonly db: KyselyDB,
private readonly pageHierarchyRepo: PageHierarchyRepo,
) {
super();
}
async process(job: Job<IRebuildHierarchyJob, void>): Promise<void> {
try {
switch (job.name) {
case QueueJob.REBUILD_HIERARCHY_ALL:
await this.rebuildAll();
break;
case QueueJob.REBUILD_HIERARCHY_ALL_BY_SPACE:
await this.rebuildAllBySpace();
break;
case QueueJob.REBUILD_HIERARCHY_SPACE:
if (!job.data.spaceId) {
throw new Error('spaceId is required for space rebuild');
}
await this.rebuildBySpace(job.data.spaceId);
break;
}
} catch (err) {
throw err;
}
}
private async rebuildAll(): Promise<void> {
await executeTx(this.db, async (trx) => {
const locked = await this.pageHierarchyRepo.tryAcquireGlobalLock(trx);
if (!locked) {
this.logger.debug(
'Rebuild all skipped - another process holds the lock',
);
return;
}
this.logger.debug('Rebuilding hierarchy for all pages');
const count = await this.pageHierarchyRepo.rebuildAll(trx);
this.logger.debug(`Rebuilt hierarchy for all pages (${count} entries)`);
});
}
private async rebuildBySpace(spaceId: string): Promise<void> {
await executeTx(this.db, async (trx) => {
const locked = await this.pageHierarchyRepo.tryAcquireSpaceLock(
spaceId,
trx,
);
if (!locked) {
this.logger.debug(
`Rebuild for space ${spaceId} skipped - another process holds the lock`,
);
return;
}
this.logger.debug(`Rebuilding hierarchy for space ${spaceId}`);
const count = await this.pageHierarchyRepo.rebuildBySpace(spaceId, trx);
this.logger.debug(
`Rebuilt hierarchy for space ${spaceId} (${count} entries)`,
);
});
}
private async rebuildAllBySpace(): Promise<void> {
let lastId: string | null = null;
const BATCH_SIZE = 100;
let totalSpaces = 0;
this.logger.debug('Starting hierarchy rebuild for all spaces');
while (true) {
const spaces = await this.db
.selectFrom('spaces')
.select('id')
.$if(Boolean(lastId), (qb) => qb.where('id', '>', lastId!))
.orderBy('id', 'asc')
.limit(BATCH_SIZE)
.execute();
if (spaces.length === 0) break;
for (const space of spaces) {
await this.rebuildBySpace(space.id);
totalSpaces++;
}
lastId = spaces[spaces.length - 1].id;
this.logger.debug(`Rebuilt hierarchy for ${totalSpaces} spaces...`);
}
this.logger.debug(`Completed hierarchy rebuild for ${totalSpaces} spaces`);
}
@OnWorkerEvent('active')
onActive(job: Job) {
if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) {
this.logger.debug(`Processing ${job.name} job`);
}
}
@OnWorkerEvent('failed')
onError(job: Job) {
if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) {
this.logger.error(
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
);
}
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) {
this.logger.debug(`Completed ${job.name} job`);
}
}
async onModuleDestroy(): Promise<void> {
if (this.worker) {
await this.worker.close();
}
}
}
@@ -4,6 +4,7 @@ import { EnvironmentService } from '../environment/environment.service';
import { createRetryStrategy, parseRedisUrl } from '../../common/helpers';
import { QueueName } from './constants';
import { BacklinksProcessor } from './processors/backlinks.processor';
import { HierarchyProcessor } from './processors/hierarchy.processor';
@Global()
@Module({
@@ -73,8 +74,22 @@ import { BacklinksProcessor } from './processors/backlinks.processor';
attempts: 1,
},
}),
BullModule.registerQueue({
name: QueueName.HIERARCHY_QUEUE,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: {
count: 50,
},
attempts: 3,
backoff: {
type: 'exponential',
delay: 20 * 1000,
},
},
}),
],
exports: [BullModule],
providers: [BacklinksProcessor],
providers: [BacklinksProcessor, HierarchyProcessor],
})
export class QueueModule {}