mirror of
https://github.com/docmost/docmost.git
synced 2026-05-07 06:23:06 +08:00
WIP - repair check
This commit is contained in:
@@ -2,6 +2,18 @@ import { Injectable, Logger } from '@nestjs/common';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import { KyselyDB, KyselyTransaction } from '../../types/kysely.types';
|
||||
import { sql } from 'kysely';
|
||||
import { executeTx } from '../../utils';
|
||||
|
||||
export type IntegrityReport = {
|
||||
healthy: boolean;
|
||||
extraEntries: number;
|
||||
missingEntries: number;
|
||||
affectedSpaceIds: string[];
|
||||
};
|
||||
|
||||
export type RepairResult = {
|
||||
rebuiltSpaces: number;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class PageHierarchyRepo {
|
||||
@@ -111,4 +123,91 @@ export class PageHierarchyRepo {
|
||||
|
||||
return Number(result?.numInsertedOrUpdatedRows ?? 0);
|
||||
}
|
||||
|
||||
async checkIntegrity(): Promise<IntegrityReport> {
|
||||
const result = await this.db
|
||||
.withRecursive('expected', (qb) =>
|
||||
qb
|
||||
.selectFrom('pages')
|
||||
.select([
|
||||
'id as ancestorId',
|
||||
'id as descendantId',
|
||||
sql<number>`0`.as('depth'),
|
||||
'spaceId',
|
||||
])
|
||||
.where('deletedAt', 'is', null)
|
||||
.unionAll((eb) =>
|
||||
eb
|
||||
.selectFrom('pages as p')
|
||||
.innerJoin('expected as e', 'p.parentPageId', 'e.descendantId')
|
||||
.select([
|
||||
'e.ancestorId',
|
||||
'p.id as descendantId',
|
||||
sql<number>`e.depth + 1`.as('depth'),
|
||||
'p.spaceId',
|
||||
])
|
||||
.where('p.deletedAt', 'is', null),
|
||||
),
|
||||
)
|
||||
.selectFrom('expected as e')
|
||||
.fullJoin('pageHierarchy as ph', (join) =>
|
||||
join
|
||||
.onRef('e.ancestorId', '=', 'ph.ancestorId')
|
||||
.onRef('e.descendantId', '=', 'ph.descendantId'),
|
||||
)
|
||||
.leftJoin('pages as p', 'ph.descendantId', 'p.id')
|
||||
.select([
|
||||
sql<number>`count(*) filter (where e.ancestor_id is null and ph.ancestor_id is not null)`.as(
|
||||
'extraCount',
|
||||
),
|
||||
sql<number>`count(*) filter (where ph.ancestor_id is null and e.ancestor_id is not null)`.as(
|
||||
'missingCount',
|
||||
),
|
||||
sql<
|
||||
string[]
|
||||
>`array_agg(distinct coalesce(e.space_id, p.space_id)) filter (where
|
||||
(e.ancestor_id is null and ph.ancestor_id is not null) or
|
||||
(ph.ancestor_id is null and e.ancestor_id is not null)
|
||||
)`.as('affectedSpaceIds'),
|
||||
])
|
||||
.executeTakeFirst();
|
||||
|
||||
const extraCount = Number(result?.extraCount ?? 0);
|
||||
const missingCount = Number(result?.missingCount ?? 0);
|
||||
const affectedSpaceIds = (result?.affectedSpaceIds ?? []).filter(Boolean);
|
||||
|
||||
return {
|
||||
healthy: extraCount === 0 && missingCount === 0,
|
||||
extraEntries: extraCount,
|
||||
missingEntries: missingCount,
|
||||
affectedSpaceIds,
|
||||
};
|
||||
}
|
||||
|
||||
async repair(): Promise<RepairResult> {
|
||||
const report = await this.checkIntegrity();
|
||||
|
||||
if (report.healthy) {
|
||||
return { rebuiltSpaces: 0 };
|
||||
}
|
||||
|
||||
let rebuiltSpaces = 0;
|
||||
|
||||
for (const spaceId of report.affectedSpaceIds) {
|
||||
await executeTx(this.db, async (trx) => {
|
||||
const locked = await this.tryAcquireSpaceLock(spaceId, trx);
|
||||
if (!locked) {
|
||||
this.logger.debug(
|
||||
`Repair for space ${spaceId} skipped - another process holds the lock`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.rebuildBySpace(spaceId, trx);
|
||||
rebuiltSpaces++;
|
||||
});
|
||||
}
|
||||
|
||||
return { rebuiltSpaces };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,12 +8,6 @@ import { IRebuildHierarchyJob } from '../constants/queue.interface';
|
||||
import { PageHierarchyRepo } from '@docmost/db/repos/page/page-hierarchy.repo';
|
||||
import { executeTx } from '@docmost/db/utils';
|
||||
|
||||
const HIERARCHY_JOBS = [
|
||||
QueueJob.REBUILD_HIERARCHY_ALL,
|
||||
QueueJob.REBUILD_HIERARCHY_ALL_BY_SPACE,
|
||||
QueueJob.REBUILD_HIERARCHY_SPACE,
|
||||
] as const;
|
||||
|
||||
@Processor(QueueName.HIERARCHY_QUEUE)
|
||||
export class HierarchyProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
private readonly logger = new Logger(HierarchyProcessor.name);
|
||||
@@ -117,25 +111,19 @@ export class HierarchyProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
|
||||
@OnWorkerEvent('active')
|
||||
onActive(job: Job) {
|
||||
if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) {
|
||||
this.logger.debug(`Processing ${job.name} job`);
|
||||
}
|
||||
this.logger.debug(`Processing ${job.name} job`);
|
||||
}
|
||||
|
||||
@OnWorkerEvent('failed')
|
||||
onError(job: Job) {
|
||||
if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) {
|
||||
this.logger.error(
|
||||
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
|
||||
);
|
||||
}
|
||||
this.logger.error(
|
||||
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
|
||||
);
|
||||
}
|
||||
|
||||
@OnWorkerEvent('completed')
|
||||
onCompleted(job: Job) {
|
||||
if (HIERARCHY_JOBS.includes(job.name as (typeof HIERARCHY_JOBS)[number])) {
|
||||
this.logger.debug(`Completed ${job.name} job`);
|
||||
}
|
||||
this.logger.debug(`Completed ${job.name} job`);
|
||||
}
|
||||
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
|
||||
Reference in New Issue
Block a user