From 873dd3bb51fc539aa72d8039ec9a27b727854124 Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Tue, 6 Jan 2026 22:36:55 +0000 Subject: [PATCH] WIP - repair check --- .../repos/page/page-hierarchy.repo.ts | 99 +++++++++++++++++++ .../queue/processors/hierarchy.processor.ts | 22 +---- 2 files changed, 104 insertions(+), 17 deletions(-) diff --git a/apps/server/src/database/repos/page/page-hierarchy.repo.ts b/apps/server/src/database/repos/page/page-hierarchy.repo.ts index 49cfd847..de7679c8 100644 --- a/apps/server/src/database/repos/page/page-hierarchy.repo.ts +++ b/apps/server/src/database/repos/page/page-hierarchy.repo.ts @@ -2,6 +2,18 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectKysely } from 'nestjs-kysely'; import { KyselyDB, KyselyTransaction } from '../../types/kysely.types'; import { sql } from 'kysely'; +import { executeTx } from '../../utils'; + +export type IntegrityReport = { + healthy: boolean; + extraEntries: number; + missingEntries: number; + affectedSpaceIds: string[]; +}; + +export type RepairResult = { + rebuiltSpaces: number; +}; @Injectable() export class PageHierarchyRepo { @@ -111,4 +123,91 @@ export class PageHierarchyRepo { return Number(result?.numInsertedOrUpdatedRows ?? 0); } + + async checkIntegrity(): Promise { + const result = await this.db + .withRecursive('expected', (qb) => + qb + .selectFrom('pages') + .select([ + 'id as ancestorId', + 'id as descendantId', + sql`0`.as('depth'), + 'spaceId', + ]) + .where('deletedAt', 'is', null) + .unionAll((eb) => + eb + .selectFrom('pages as p') + .innerJoin('expected as e', 'p.parentPageId', 'e.descendantId') + .select([ + 'e.ancestorId', + 'p.id as descendantId', + sql`e.depth + 1`.as('depth'), + 'p.spaceId', + ]) + .where('p.deletedAt', 'is', null), + ), + ) + .selectFrom('expected as e') + .fullJoin('pageHierarchy as ph', (join) => + join + .onRef('e.ancestorId', '=', 'ph.ancestorId') + .onRef('e.descendantId', '=', 'ph.descendantId'), + ) + .leftJoin('pages as p', 'ph.descendantId', 'p.id') + .select([ + sql`count(*) filter (where e.ancestor_id is null and ph.ancestor_id is not null)`.as( + 'extraCount', + ), + sql`count(*) filter (where ph.ancestor_id is null and e.ancestor_id is not null)`.as( + 'missingCount', + ), + sql< + string[] + >`array_agg(distinct coalesce(e.space_id, p.space_id)) filter (where + (e.ancestor_id is null and ph.ancestor_id is not null) or + (ph.ancestor_id is null and e.ancestor_id is not null) + )`.as('affectedSpaceIds'), + ]) + .executeTakeFirst(); + + const extraCount = Number(result?.extraCount ?? 0); + const missingCount = Number(result?.missingCount ?? 0); + const affectedSpaceIds = (result?.affectedSpaceIds ?? []).filter(Boolean); + + return { + healthy: extraCount === 0 && missingCount === 0, + extraEntries: extraCount, + missingEntries: missingCount, + affectedSpaceIds, + }; + } + + async repair(): Promise { + const report = await this.checkIntegrity(); + + if (report.healthy) { + return { rebuiltSpaces: 0 }; + } + + let rebuiltSpaces = 0; + + for (const spaceId of report.affectedSpaceIds) { + await executeTx(this.db, async (trx) => { + const locked = await this.tryAcquireSpaceLock(spaceId, trx); + if (!locked) { + this.logger.debug( + `Repair for space ${spaceId} skipped - another process holds the lock`, + ); + return; + } + + await this.rebuildBySpace(spaceId, trx); + rebuiltSpaces++; + }); + } + + return { rebuiltSpaces }; + } } diff --git a/apps/server/src/integrations/queue/processors/hierarchy.processor.ts b/apps/server/src/integrations/queue/processors/hierarchy.processor.ts index 590e04df..d1a1976a 100644 --- a/apps/server/src/integrations/queue/processors/hierarchy.processor.ts +++ b/apps/server/src/integrations/queue/processors/hierarchy.processor.ts @@ -8,12 +8,6 @@ import { IRebuildHierarchyJob } from '../constants/queue.interface'; import { PageHierarchyRepo } from '@docmost/db/repos/page/page-hierarchy.repo'; import { executeTx } from '@docmost/db/utils'; -const HIERARCHY_JOBS = [ - QueueJob.REBUILD_HIERARCHY_ALL, - QueueJob.REBUILD_HIERARCHY_ALL_BY_SPACE, - QueueJob.REBUILD_HIERARCHY_SPACE, -] as const; - @Processor(QueueName.HIERARCHY_QUEUE) export class HierarchyProcessor extends WorkerHost implements OnModuleDestroy { private readonly logger = new Logger(HierarchyProcessor.name); @@ -117,25 +111,19 @@ export class HierarchyProcessor extends WorkerHost implements OnModuleDestroy { @OnWorkerEvent('active') onActive(job: Job) { - if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) { - this.logger.debug(`Processing ${job.name} job`); - } + this.logger.debug(`Processing ${job.name} job`); } @OnWorkerEvent('failed') onError(job: Job) { - if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) { - this.logger.error( - `Error processing ${job.name} job. Reason: ${job.failedReason}`, - ); - } + this.logger.error( + `Error processing ${job.name} job. Reason: ${job.failedReason}`, + ); } @OnWorkerEvent('completed') onCompleted(job: Job) { - if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) { - this.logger.debug(`Completed ${job.name} job`); - } + this.logger.debug(`Completed ${job.name} job`); } async onModuleDestroy(): Promise {