- CTE approach

- Remove closure table usage
This commit is contained in:
Philipinho
2026-01-11 04:36:32 +00:00
parent 4c635b4faf
commit a5696bb8e8
11 changed files with 219 additions and 727 deletions
@@ -27,7 +27,6 @@ export class AddPagePermissionDto extends PageIdDto {
@ArrayMaxSize(25, {
message: 'userIds must be an array with no more than 25 elements',
})
@ArrayMinSize(1)
@IsUUID('all', { each: true })
userIds?: string[];
@@ -36,7 +35,6 @@ export class AddPagePermissionDto extends PageIdDto {
@ArrayMaxSize(25, {
message: 'groupIds must be an array with no more than 25 elements',
})
@ArrayMinSize(1)
@IsUUID('all', { each: true })
groupIds?: string[];
}
@@ -47,7 +45,6 @@ export class RemovePagePermissionDto extends PageIdDto {
@ArrayMaxSize(25, {
message: 'userIds must be an array with no more than 25 elements',
})
@ArrayMinSize(1)
@IsUUID('all', { each: true })
userIds?: string[];
@@ -56,7 +53,6 @@ export class RemovePagePermissionDto extends PageIdDto {
@ArrayMaxSize(25, {
message: 'groupIds must be an array with no more than 25 elements',
})
@ArrayMinSize(1)
@IsUUID('all', { each: true })
groupIds?: string[];
}
+1 -8
View File
@@ -4,7 +4,6 @@ import { PageController } from './page.controller';
import { PageHistoryService } from './services/page-history.service';
import { TrashCleanupService } from './services/trash-cleanup.service';
import { PagePermissionService } from './services/page-permission.service';
import { PageHierarchyService } from './services/page-hierarchy.service';
import { PagePermissionController } from './page-permission.controller';
import { StorageModule } from '../../integrations/storage/storage.module';
@@ -15,14 +14,8 @@ import { StorageModule } from '../../integrations/storage/storage.module';
PageHistoryService,
TrashCleanupService,
PagePermissionService,
PageHierarchyService,
],
exports: [
PageService,
PageHistoryService,
PagePermissionService,
PageHierarchyService,
],
exports: [PageService, PageHistoryService, PagePermissionService],
imports: [StorageModule],
})
export class PageModule {}
@@ -1,56 +0,0 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { PageHierarchyRepo } from '@docmost/db/repos/page/page-hierarchy.repo';
import { executeTx } from '@docmost/db/utils';
type RebuildResult = { rebuilt: boolean; count: number };
@Injectable()
export class PageHierarchyService {
private readonly logger = new Logger(PageHierarchyService.name);
constructor(
@InjectKysely() private readonly db: KyselyDB,
private readonly pageHierarchyRepo: PageHierarchyRepo,
) {}
async rebuildAll(): Promise<RebuildResult> {
return executeTx(this.db, async (trx) => {
const locked = await this.pageHierarchyRepo.tryAcquireGlobalLock(trx);
if (!locked) {
this.logger.debug('Rebuild all skipped - another process holds the lock');
return { rebuilt: false, count: 0 };
}
this.logger.log('Rebuilding hierarchy for all pages');
const count = await this.pageHierarchyRepo.rebuildAll(trx);
this.logger.log(`Rebuilt hierarchy for all pages (${count} entries)`);
return { rebuilt: true, count };
});
}
async rebuildBySpace(spaceId: string): Promise<RebuildResult> {
return executeTx(this.db, async (trx) => {
const locked = await this.pageHierarchyRepo.tryAcquireSpaceLock(
spaceId,
trx,
);
if (!locked) {
this.logger.debug(
`Rebuild for space ${spaceId} skipped - another process holds the lock`,
);
return { rebuilt: false, count: 0 };
}
this.logger.log(`Rebuilding hierarchy for space ${spaceId}`);
const count = await this.pageHierarchyRepo.rebuildBySpace(spaceId, trx);
this.logger.log(
`Rebuilt hierarchy for space ${spaceId} (${count} entries)`,
);
return { rebuilt: true, count };
});
}
}
@@ -17,7 +17,6 @@ import { SpaceRepo } from '@docmost/db/repos/space/space.repo';
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
import { PageRepo } from './repos/page/page.repo';
import { PagePermissionRepo } from './repos/page/page-permission.repo';
import { PageHierarchyRepo } from './repos/page/page-hierarchy.repo';
import { CommentRepo } from './repos/comment/comment.repo';
import { PageHistoryRepo } from './repos/page/page-history.repo';
import { AttachmentRepo } from './repos/attachment/attachment.repo';
@@ -74,7 +73,6 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val));
SpaceMemberRepo,
PageRepo,
PagePermissionRepo,
PageHierarchyRepo,
PageHistoryRepo,
CommentRepo,
AttachmentRepo,
@@ -92,7 +90,6 @@ types.setTypeParser(types.builtins.INT8, (val) => Number(val));
SpaceMemberRepo,
PageRepo,
PagePermissionRepo,
PageHierarchyRepo,
PageHistoryRepo,
CommentRepo,
AttachmentRepo,
@@ -1,204 +0,0 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('page_hierarchy')
.ifNotExists()
.addColumn('ancestor_id', 'uuid', (col) =>
col.notNull().references('pages.id').onDelete('cascade'),
)
.addColumn('descendant_id', 'uuid', (col) =>
col.notNull().references('pages.id').onDelete('cascade'),
)
.addColumn('depth', 'integer', (col) => col.notNull().defaultTo(0))
.addPrimaryKeyConstraint('page_hierarchy_pkey', [
'ancestor_id',
'descendant_id',
])
.execute();
// indexes
await db.schema
.createIndex('idx_page_hierarchy_descendant')
.ifNotExists()
.on('page_hierarchy')
.column('descendant_id')
.execute();
await db.schema
.createIndex('idx_page_hierarchy_ancestor_depth')
.ifNotExists()
.on('page_hierarchy')
.columns(['ancestor_id', 'depth'])
.execute();
await db.schema
.createIndex('idx_page_hierarchy_descendant_depth')
.ifNotExists()
.on('page_hierarchy')
.columns(['descendant_id', 'depth'])
.execute();
// rebuild function
await sql`
CREATE OR REPLACE FUNCTION rebuild_page_hierarchy()
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
IF NOT pg_try_advisory_xact_lock(hashtext('rebuild_page_hierarchy')) THEN
RETURN;
END IF;
TRUNCATE page_hierarchy;
WITH RECURSIVE page_tree AS (
SELECT id AS ancestor_id, id AS descendant_id, 0 AS depth
FROM pages WHERE deleted_at IS NULL
UNION ALL
SELECT pt.ancestor_id, p.id AS descendant_id, pt.depth + 1
FROM page_tree pt
JOIN pages p ON p.parent_page_id = pt.descendant_id
WHERE p.deleted_at IS NULL
)
INSERT INTO page_hierarchy (ancestor_id, descendant_id, depth)
SELECT ancestor_id, descendant_id, depth FROM page_tree;
END;
$$;
`.execute(db);
// Create insert trigger function
await sql`
CREATE OR REPLACE FUNCTION page_hierarchy_after_insert()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF NEW.deleted_at IS NOT NULL THEN
RETURN NEW;
END IF;
IF NEW.parent_page_id IS NULL THEN
INSERT INTO page_hierarchy (ancestor_id, descendant_id, depth)
VALUES (NEW.id, NEW.id, 0);
ELSE
INSERT INTO page_hierarchy (ancestor_id, descendant_id, depth)
SELECT ancestor_id, NEW.id, depth + 1
FROM page_hierarchy
WHERE descendant_id = NEW.parent_page_id
UNION ALL
SELECT NEW.id, NEW.id, 0;
END IF;
RETURN NEW;
END;
$$;
`.execute(db);
await sql`
CREATE OR REPLACE TRIGGER page_hierarchy_after_insert_trigger
AFTER INSERT ON pages
FOR EACH ROW
EXECUTE FUNCTION page_hierarchy_after_insert();
`.execute(db);
// Create update trigger function
await sql`
CREATE OR REPLACE FUNCTION page_hierarchy_after_update()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
subtree_ids UUID[];
BEGIN
-- Only process if parent_page_id or deleted_at changed
IF OLD.parent_page_id IS NOT DISTINCT FROM NEW.parent_page_id
AND OLD.deleted_at IS NOT DISTINCT FROM NEW.deleted_at THEN
RETURN NEW;
END IF;
-- Handle soft-delete: remove from closure when deleted_at is set
IF OLD.deleted_at IS NULL AND NEW.deleted_at IS NOT NULL THEN
SELECT array_agg(descendant_id) INTO subtree_ids
FROM page_hierarchy
WHERE ancestor_id = NEW.id;
DELETE FROM page_hierarchy
WHERE descendant_id = ANY(subtree_ids);
RETURN NEW;
END IF;
-- Handle restore: rebuild closure when deleted_at is cleared
IF OLD.deleted_at IS NOT NULL AND NEW.deleted_at IS NULL THEN
IF NEW.parent_page_id IS NULL THEN
INSERT INTO page_hierarchy (ancestor_id, descendant_id, depth)
VALUES (NEW.id, NEW.id, 0);
ELSE
INSERT INTO page_hierarchy (ancestor_id, descendant_id, depth)
SELECT ancestor_id, NEW.id, depth + 1
FROM page_hierarchy
WHERE descendant_id = NEW.parent_page_id
UNION ALL
SELECT NEW.id, NEW.id, 0;
END IF;
RETURN NEW;
END IF;
-- Skip if page is soft-deleted
IF NEW.deleted_at IS NOT NULL THEN
RETURN NEW;
END IF;
-- Move operation: parent changed
-- Get all descendants of the moved page (including itself)
SELECT array_agg(descendant_id) INTO subtree_ids
FROM page_hierarchy
WHERE ancestor_id = NEW.id;
-- Delete old ancestor relationships (keep internal subtree links)
DELETE FROM page_hierarchy
WHERE descendant_id = ANY(subtree_ids)
AND NOT (ancestor_id = ANY(subtree_ids));
-- Insert new ancestor relationships (if new parent exists)
IF NEW.parent_page_id IS NOT NULL THEN
INSERT INTO page_hierarchy (ancestor_id, descendant_id, depth)
SELECT
new_anc.ancestor_id,
sub.descendant_id,
new_anc.depth + sub.depth + 1
FROM page_hierarchy new_anc
CROSS JOIN page_hierarchy sub
WHERE new_anc.descendant_id = NEW.parent_page_id
AND sub.ancestor_id = NEW.id
AND sub.descendant_id = ANY(subtree_ids);
END IF;
RETURN NEW;
END;
$$;
`.execute(db);
await sql`
CREATE OR REPLACE TRIGGER page_hierarchy_after_update_trigger
AFTER UPDATE ON pages
FOR EACH ROW
EXECUTE FUNCTION page_hierarchy_after_update();
`.execute(db);
await sql`SELECT rebuild_page_hierarchy()`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`DROP TRIGGER IF EXISTS page_hierarchy_after_update_trigger ON pages`.execute(
db,
);
await sql`DROP TRIGGER IF EXISTS page_hierarchy_after_insert_trigger ON pages`.execute(
db,
);
await sql`DROP FUNCTION IF EXISTS page_hierarchy_after_update()`.execute(db);
await sql`DROP FUNCTION IF EXISTS page_hierarchy_after_insert()`.execute(db);
await sql`DROP FUNCTION IF EXISTS rebuild_page_hierarchy()`.execute(db);
await db.schema.dropTable('page_hierarchy').ifExists().execute();
}
@@ -1,213 +0,0 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB, KyselyTransaction } from '../../types/kysely.types';
import { sql } from 'kysely';
import { executeTx } from '../../utils';
export type IntegrityReport = {
healthy: boolean;
extraEntries: number;
missingEntries: number;
affectedSpaceIds: string[];
};
export type RepairResult = {
rebuiltSpaces: number;
};
@Injectable()
export class PageHierarchyRepo {
private readonly logger = new Logger(PageHierarchyRepo.name);
constructor(@InjectKysely() private readonly db: KyselyDB) {}
async tryAcquireGlobalLock(trx: KyselyTransaction): Promise<boolean> {
const result = await sql<{ locked: boolean }>`
SELECT pg_try_advisory_xact_lock(hashtext('rebuild_page_hierarchy_global')) as locked
`.execute(trx);
return result.rows[0]?.locked ?? false;
}
async tryAcquireSpaceLock(
spaceId: string,
trx: KyselyTransaction,
): Promise<boolean> {
const result = await sql<{ locked: boolean }>`
SELECT pg_try_advisory_xact_lock(hashtext(${'rebuild_page_hierarchy_space_' + spaceId})) as locked
`.execute(trx);
return result.rows[0]?.locked ?? false;
}
async rebuildAll(trx: KyselyTransaction): Promise<number> {
await trx.deleteFrom('pageHierarchy').execute();
const result = await trx
.withRecursive('pageTree', (qb) =>
qb
.selectFrom('pages')
.select([
'id as ancestorId',
'id as descendantId',
sql<number>`0`.as('depth'),
])
.where('deletedAt', 'is', null)
.unionAll((eb) =>
eb
.selectFrom('pages as p')
.innerJoin('pageTree as pt', 'p.parentPageId', 'pt.descendantId')
.select([
'pt.ancestorId',
'p.id as descendantId',
sql<number>`pt.depth + 1`.as('depth'),
])
.where('p.deletedAt', 'is', null),
),
)
.insertInto('pageHierarchy')
.columns(['ancestorId', 'descendantId', 'depth'])
.expression((eb) =>
eb
.selectFrom('pageTree')
.select(['ancestorId', 'descendantId', 'depth']),
)
.executeTakeFirst();
return Number(result?.numInsertedOrUpdatedRows ?? 0);
}
async rebuildBySpace(
spaceId: string,
trx: KyselyTransaction,
): Promise<number> {
await trx
.deleteFrom('pageHierarchy')
.where(
'descendantId',
'in',
trx.selectFrom('pages').select('id').where('spaceId', '=', spaceId),
)
.execute();
const result = await trx
.withRecursive('pageTree', (qb) =>
qb
.selectFrom('pages')
.select([
'id as ancestorId',
'id as descendantId',
sql<number>`0`.as('depth'),
])
.where('spaceId', '=', spaceId)
.where('deletedAt', 'is', null)
.unionAll((eb) =>
eb
.selectFrom('pages as p')
.innerJoin('pageTree as pt', 'p.parentPageId', 'pt.descendantId')
.select([
'pt.ancestorId',
'p.id as descendantId',
sql<number>`pt.depth + 1`.as('depth'),
])
.where('p.spaceId', '=', spaceId)
.where('p.deletedAt', 'is', null),
),
)
.insertInto('pageHierarchy')
.columns(['ancestorId', 'descendantId', 'depth'])
.expression((eb) =>
eb
.selectFrom('pageTree')
.select(['ancestorId', 'descendantId', 'depth']),
)
.executeTakeFirst();
return Number(result?.numInsertedOrUpdatedRows ?? 0);
}
async checkIntegrity(): Promise<IntegrityReport> {
const result = await this.db
.withRecursive('expected', (qb) =>
qb
.selectFrom('pages')
.select([
'id as ancestorId',
'id as descendantId',
sql<number>`0`.as('depth'),
'spaceId',
])
.where('deletedAt', 'is', null)
.unionAll((eb) =>
eb
.selectFrom('pages as p')
.innerJoin('expected as e', 'p.parentPageId', 'e.descendantId')
.select([
'e.ancestorId',
'p.id as descendantId',
sql<number>`e.depth + 1`.as('depth'),
'p.spaceId',
])
.where('p.deletedAt', 'is', null),
),
)
.selectFrom('expected as e')
.fullJoin('pageHierarchy as ph', (join) =>
join
.onRef('e.ancestorId', '=', 'ph.ancestorId')
.onRef('e.descendantId', '=', 'ph.descendantId'),
)
.leftJoin('pages as p', 'ph.descendantId', 'p.id')
.select([
sql<number>`count(*) filter (where e.ancestor_id is null and ph.ancestor_id is not null)`.as(
'extraCount',
),
sql<number>`count(*) filter (where ph.ancestor_id is null and e.ancestor_id is not null)`.as(
'missingCount',
),
sql<
string[]
>`array_agg(distinct coalesce(e.space_id, p.space_id)) filter (where
(e.ancestor_id is null and ph.ancestor_id is not null) or
(ph.ancestor_id is null and e.ancestor_id is not null)
)`.as('affectedSpaceIds'),
])
.executeTakeFirst();
const extraCount = Number(result?.extraCount ?? 0);
const missingCount = Number(result?.missingCount ?? 0);
const affectedSpaceIds = (result?.affectedSpaceIds ?? []).filter(Boolean);
return {
healthy: extraCount === 0 && missingCount === 0,
extraEntries: extraCount,
missingEntries: missingCount,
affectedSpaceIds,
};
}
async repair(): Promise<RepairResult> {
const report = await this.checkIntegrity();
if (report.healthy) {
return { rebuiltSpaces: 0 };
}
let rebuiltSpaces = 0;
for (const spaceId of report.affectedSpaceIds) {
await executeTx(this.db, async (trx) => {
const locked = await this.tryAcquireSpaceLock(spaceId, trx);
if (!locked) {
this.logger.debug(
`Repair for space ${spaceId} skipped - another process holds the lock`,
);
return;
}
await this.rebuildBySpace(spaceId, trx);
rebuiltSpaces++;
});
}
return { rebuiltSpaces };
}
}
@@ -10,9 +10,10 @@ import {
} from '@docmost/db/types/entity.types';
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
import { executeWithPagination } from '@docmost/db/pagination/pagination';
import { sql, SqlBool } from 'kysely';
import { ExpressionBuilder, sql, SqlBool } from 'kysely';
import { GroupRepo } from '@docmost/db/repos/group/group.repo';
import { GroupUserRepo } from '@docmost/db/repos/group/group-user.repo';
import { DB } from '@docmost/db/types/db';
@Injectable()
export class PagePermissionRepo {
@@ -300,15 +301,34 @@ export class PagePermissionRepo {
{ pageId: string; accessLevel: string; depth: number } | undefined
> {
return this.db
.selectFrom('pageHierarchy')
.innerJoin('pageAccess', 'pageAccess.pageId', 'pageHierarchy.ancestorId')
.withRecursive('ancestors', (qb) =>
qb
.selectFrom('pages')
.select([
'pages.id as ancestorId',
'pages.parentPageId',
sql<number>`0`.as('depth'),
])
.where('pages.id', '=', pageId)
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin('ancestors', 'ancestors.parentPageId', 'pages.id')
.select([
'pages.id as ancestorId',
'pages.parentPageId',
sql<number>`ancestors.depth + 1`.as('depth'),
]),
),
)
.selectFrom('ancestors')
.innerJoin('pageAccess', 'pageAccess.pageId', 'ancestors.ancestorId')
.select([
'pageAccess.pageId',
'pageAccess.accessLevel',
'pageHierarchy.depth',
'ancestors.depth',
])
.where('pageHierarchy.descendantId', '=', pageId)
.orderBy('pageHierarchy.depth', 'asc')
.orderBy('ancestors.depth', 'asc')
.executeTakeFirst();
}
@@ -317,8 +337,20 @@ export class PagePermissionRepo {
*/
async canUserAccessPage(userId: string, pageId: string): Promise<boolean> {
const deniedAncestor = await this.db
.selectFrom('pageHierarchy')
.innerJoin('pageAccess', 'pageAccess.pageId', 'pageHierarchy.ancestorId')
.withRecursive('ancestors', (qb) =>
qb
.selectFrom('pages')
.select(['pages.id as ancestorId', 'pages.parentPageId'])
.where('pages.id', '=', pageId)
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin('ancestors', 'ancestors.parentPageId', 'pages.id')
.select(['pages.id as ancestorId', 'pages.parentPageId']),
),
)
.selectFrom('ancestors')
.innerJoin('pageAccess', 'pageAccess.pageId', 'ancestors.ancestorId')
.leftJoin('pagePermissions', (join) =>
join
.onRef('pagePermissions.pageAccessId', '=', 'pageAccess.id')
@@ -328,16 +360,12 @@ export class PagePermissionRepo {
eb(
'pagePermissions.groupId',
'in',
eb
.selectFrom('groupUsers')
.select('groupUsers.groupId')
.where('groupUsers.userId', '=', userId),
this.userGroupIdsSubquery(eb, userId),
),
]),
),
)
.select('pageAccess.pageId')
.where('pageHierarchy.descendantId', '=', pageId)
.where('pagePermissions.id', 'is', null)
.executeTakeFirst();
@@ -349,8 +377,20 @@ export class PagePermissionRepo {
*/
async canUserEditPage(userId: string, pageId: string): Promise<boolean> {
const deniedAncestor = await this.db
.selectFrom('pageHierarchy')
.innerJoin('pageAccess', 'pageAccess.pageId', 'pageHierarchy.ancestorId')
.withRecursive('ancestors', (qb) =>
qb
.selectFrom('pages')
.select(['pages.id as ancestorId', 'pages.parentPageId'])
.where('pages.id', '=', pageId)
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin('ancestors', 'ancestors.parentPageId', 'pages.id')
.select(['pages.id as ancestorId', 'pages.parentPageId']),
),
)
.selectFrom('ancestors')
.innerJoin('pageAccess', 'pageAccess.pageId', 'ancestors.ancestorId')
.leftJoin('pagePermissions', (join) =>
join
.onRef('pagePermissions.pageAccessId', '=', 'pageAccess.id')
@@ -361,16 +401,12 @@ export class PagePermissionRepo {
eb(
'pagePermissions.groupId',
'in',
eb
.selectFrom('groupUsers')
.select('groupUsers.groupId')
.where('groupUsers.userId', '=', userId),
this.userGroupIdsSubquery(eb, userId),
),
]),
),
)
.select('pageAccess.pageId')
.where('pageHierarchy.descendantId', '=', pageId)
.where('pagePermissions.id', 'is', null)
.executeTakeFirst();
@@ -397,6 +433,26 @@ export class PagePermissionRepo {
canEdit: boolean;
}> {
const result = await this.db
.withRecursive('ancestors', (qb) =>
qb
.selectFrom('pages')
.select([
'pages.id as ancestorId',
'pages.parentPageId',
sql<number>`0`.as('depth'),
])
.where('pages.id', '=', pageId)
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin('ancestors', 'ancestors.parentPageId', 'pages.id')
.select([
'pages.id as ancestorId',
'pages.parentPageId',
sql<number>`ancestors.depth + 1`.as('depth'),
]),
),
)
.selectFrom('pages')
.select((eb) => [
// hasDirectRestriction: this page itself has page_access entry
@@ -420,15 +476,14 @@ export class PagePermissionRepo {
.when(
eb.exists(
eb
.selectFrom('pageHierarchy')
.selectFrom('ancestors')
.innerJoin(
'pageAccess',
'pageAccess.pageId',
'pageHierarchy.ancestorId',
'ancestors.ancestorId',
)
.select('pageAccess.id')
.whereRef('pageHierarchy.descendantId', '=', 'pages.id')
.where('pageHierarchy.depth', '>', 0),
.where('ancestors.depth', '>', 0),
),
)
.then(true)
@@ -442,11 +497,11 @@ export class PagePermissionRepo {
eb.not(
eb.exists(
eb
.selectFrom('pageHierarchy')
.selectFrom('ancestors')
.innerJoin(
'pageAccess',
'pageAccess.pageId',
'pageHierarchy.ancestorId',
'ancestors.ancestorId',
)
.leftJoin('pagePermissions', (join) =>
join
@@ -461,16 +516,12 @@ export class PagePermissionRepo {
eb2(
'pagePermissions.groupId',
'in',
eb2
.selectFrom('groupUsers')
.select('groupUsers.groupId')
.where('groupUsers.userId', '=', userId),
this.userGroupIdsSubquery(eb2, userId),
),
]),
),
)
.select('pageAccess.pageId')
.whereRef('pageHierarchy.descendantId', '=', 'pages.id')
.where('pagePermissions.id', 'is', null),
),
),
@@ -486,11 +537,11 @@ export class PagePermissionRepo {
eb.not(
eb.exists(
eb
.selectFrom('pageHierarchy')
.selectFrom('ancestors')
.innerJoin(
'pageAccess',
'pageAccess.pageId',
'pageHierarchy.ancestorId',
'ancestors.ancestorId',
)
.leftJoin('pagePermissions', (join) =>
join
@@ -506,16 +557,12 @@ export class PagePermissionRepo {
eb2(
'pagePermissions.groupId',
'in',
eb2
.selectFrom('groupUsers')
.select('groupUsers.groupId')
.where('groupUsers.userId', '=', userId),
this.userGroupIdsSubquery(eb2, userId),
),
]),
),
)
.select('pageAccess.pageId')
.whereRef('pageHierarchy.descendantId', '=', 'pages.id')
.where('pagePermissions.id', 'is', null),
),
),
@@ -552,6 +599,30 @@ export class PagePermissionRepo {
if (pageIds.length === 0) return [];
const results = await this.db
.withRecursive('allAncestors', (qb) =>
qb
.selectFrom('pages')
.select([
'pages.id as pageId',
'pages.id as ancestorId',
'pages.parentPageId',
])
.where(sql<SqlBool>`pages.id = ANY(${pageIds}::uuid[])`)
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin(
'allAncestors',
'allAncestors.parentPageId',
'pages.id',
)
.select([
'allAncestors.pageId',
'pages.id as ancestorId',
'pages.parentPageId',
]),
),
)
.selectFrom('pages')
.select('pages.id')
// Check if user lacks writer permission on any restricted ancestor
@@ -562,11 +633,11 @@ export class PagePermissionRepo {
eb.not(
eb.exists(
eb
.selectFrom('pageHierarchy')
.selectFrom('allAncestors')
.innerJoin(
'pageAccess',
'pageAccess.pageId',
'pageHierarchy.ancestorId',
'allAncestors.ancestorId',
)
.leftJoin('pagePermissions', (join) =>
join
@@ -582,16 +653,13 @@ export class PagePermissionRepo {
eb2(
'pagePermissions.groupId',
'in',
eb2
.selectFrom('groupUsers')
.select('groupUsers.groupId')
.where('groupUsers.userId', '=', userId),
this.userGroupIdsSubquery(eb2, userId),
),
]),
),
)
.select('pageAccess.pageId')
.whereRef('pageHierarchy.descendantId', '=', 'pages.id')
.whereRef('allAncestors.pageId', '=', 'pages.id')
.where('pagePermissions.id', 'is', null),
),
),
@@ -606,11 +674,11 @@ export class PagePermissionRepo {
.where(({ not, exists, selectFrom }) =>
not(
exists(
selectFrom('pageHierarchy')
selectFrom('allAncestors')
.innerJoin(
'pageAccess',
'pageAccess.pageId',
'pageHierarchy.ancestorId',
'allAncestors.ancestorId',
)
.leftJoin('pagePermissions', (join) =>
join
@@ -621,16 +689,13 @@ export class PagePermissionRepo {
eb(
'pagePermissions.groupId',
'in',
eb
.selectFrom('groupUsers')
.select('groupUsers.groupId')
.where('groupUsers.userId', '=', userId),
this.userGroupIdsSubquery(eb, userId),
),
]),
),
)
.select('pageAccess.pageId')
.whereRef('pageHierarchy.descendantId', '=', 'pages.id')
.whereRef('allAncestors.pageId', '=', 'pages.id')
.where('pagePermissions.id', 'is', null),
),
),
@@ -646,10 +711,21 @@ export class PagePermissionRepo {
*/
async hasRestrictedAncestor(pageId: string): Promise<boolean> {
const result = await this.db
.selectFrom('pageHierarchy')
.innerJoin('pageAccess', 'pageAccess.pageId', 'pageHierarchy.ancestorId')
.withRecursive('ancestors', (qb) =>
qb
.selectFrom('pages')
.select(['pages.id as ancestorId', 'pages.parentPageId'])
.where('pages.id', '=', pageId)
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin('ancestors', 'ancestors.parentPageId', 'pages.id')
.select(['pages.id as ancestorId', 'pages.parentPageId']),
),
)
.selectFrom('ancestors')
.innerJoin('pageAccess', 'pageAccess.pageId', 'ancestors.ancestorId')
.select('pageAccess.id')
.where('pageHierarchy.descendantId', '=', pageId)
.executeTakeFirst();
return !!result;
@@ -688,6 +764,31 @@ export class PagePermissionRepo {
if (parentIds.length === 0) return [];
const results = await this.db
.withRecursive('childAncestors', (qb) =>
qb
.selectFrom('pages as child')
.select([
'child.id as childId',
'child.id as ancestorId',
'child.parentPageId as ancestorParentId',
])
.where('child.parentPageId', 'in', parentIds)
.where('child.deletedAt', 'is', null)
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin(
'childAncestors',
'childAncestors.ancestorParentId',
'pages.id',
)
.select([
'childAncestors.childId',
'pages.id as ancestorId',
'pages.parentPageId as ancestorParentId',
]),
),
)
.selectFrom('pages as child')
.select('child.parentPageId')
.distinct()
@@ -696,11 +797,11 @@ export class PagePermissionRepo {
.where(({ not, exists, selectFrom }) =>
not(
exists(
selectFrom('pageHierarchy')
selectFrom('childAncestors')
.innerJoin(
'pageAccess',
'pageAccess.pageId',
'pageHierarchy.ancestorId',
'childAncestors.ancestorId',
)
.leftJoin('pagePermissions', (join) =>
join
@@ -711,16 +812,13 @@ export class PagePermissionRepo {
eb(
'pagePermissions.groupId',
'in',
eb
.selectFrom('groupUsers')
.select('groupUsers.groupId')
.where('groupUsers.userId', '=', userId),
this.userGroupIdsSubquery(eb, userId),
),
]),
),
)
.select('pageAccess.pageId')
.whereRef('pageHierarchy.descendantId', '=', 'child.id')
.whereRef('childAncestors.childId', '=', 'child.id')
.where('pagePermissions.id', 'is', null),
),
),
@@ -737,27 +835,67 @@ export class PagePermissionRepo {
*/
async getRestrictedSubtreeIds(rootPageId: string): Promise<string[]> {
const results = await this.db
.selectFrom('pageHierarchy as subtree')
.where('subtree.ancestorId', '=', rootPageId)
.innerJoin(
(eb) =>
eb
.selectFrom('pageHierarchy as inner')
.innerJoin('pageAccess', 'pageAccess.pageId', 'inner.ancestorId')
.select('inner.descendantId as restrictedDescendant')
.distinct()
.as('restricted'),
(join) =>
join.onRef(
'restricted.restrictedDescendant',
'=',
'subtree.descendantId',
.withRecursive('descendants', (qb) =>
qb
.selectFrom('pages')
.select(['pages.id as descendantId', 'pages.parentPageId'])
.where('pages.id', '=', rootPageId)
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin(
'descendants',
'descendants.descendantId',
'pages.parentPageId',
)
.select(['pages.id as descendantId', 'pages.parentPageId'])
.where('pages.deletedAt', 'is', null),
),
)
.select('subtree.descendantId')
.withRecursive('descendantAncestors', (qb) =>
qb
.selectFrom('descendants')
.innerJoin('pages', 'pages.id', 'descendants.descendantId')
.select([
'descendants.descendantId',
'pages.id as ancestorId',
'pages.parentPageId as ancestorParentId',
])
.unionAll((eb) =>
eb
.selectFrom('pages')
.innerJoin(
'descendantAncestors',
'descendantAncestors.ancestorParentId',
'pages.id',
)
.select([
'descendantAncestors.descendantId',
'pages.id as ancestorId',
'pages.parentPageId as ancestorParentId',
]),
),
)
.selectFrom('descendantAncestors')
.innerJoin(
'pageAccess',
'pageAccess.pageId',
'descendantAncestors.ancestorId',
)
.select('descendantAncestors.descendantId')
.distinct()
.execute();
return results.map((r) => r.descendantId);
}
private userGroupIdsSubquery(
eb: ExpressionBuilder<any, keyof DB>,
userId: string,
) {
return eb
.selectFrom('groupUsers')
.select('groupUsers.groupId')
.where('groupUsers.userId', '=', userId);
}
}
@@ -6,7 +6,6 @@ export enum QueueName {
FILE_TASK_QUEUE = '{file-task-queue}',
SEARCH_QUEUE = '{search-queue}',
AI_QUEUE = '{ai-queue}',
HIERARCHY_QUEUE = '{hierarchy-queue}',
}
export enum QueueJob {
@@ -59,8 +58,4 @@ export enum QueueJob {
GENERATE_PAGE_EMBEDDINGS = 'generate-page-embeddings',
DELETE_PAGE_EMBEDDINGS = 'delete-page-embeddings',
REBUILD_HIERARCHY_ALL = 'rebuild-hierarchy-all',
REBUILD_HIERARCHY_ALL_BY_SPACE = 'rebuild-hierarchy-all-by-space',
REBUILD_HIERARCHY_SPACE = 'rebuild-hierarchy-space',
}
@@ -1,5 +1,4 @@
import { MentionNode } from "../../../common/helpers/prosemirror/utils";
import { MentionNode } from '../../../common/helpers/prosemirror/utils';
export interface IPageBacklinkJob {
pageId: string;
@@ -9,8 +8,4 @@ export interface IPageBacklinkJob {
export interface IStripeSeatsSyncJob {
workspaceId: string;
}
export interface IRebuildHierarchyJob {
spaceId?: string;
}
@@ -1,134 +0,0 @@
import { Logger, OnModuleDestroy } from '@nestjs/common';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { QueueJob, QueueName } from '../constants';
import { IRebuildHierarchyJob } from '../constants/queue.interface';
import { PageHierarchyRepo } from '@docmost/db/repos/page/page-hierarchy.repo';
import { executeTx } from '@docmost/db/utils';
@Processor(QueueName.HIERARCHY_QUEUE)
export class HierarchyProcessor extends WorkerHost implements OnModuleDestroy {
private readonly logger = new Logger(HierarchyProcessor.name);
constructor(
@InjectKysely() private readonly db: KyselyDB,
private readonly pageHierarchyRepo: PageHierarchyRepo,
) {
super();
}
async process(job: Job<IRebuildHierarchyJob, void>): Promise<void> {
try {
switch (job.name) {
case QueueJob.REBUILD_HIERARCHY_ALL:
await this.rebuildAll();
break;
case QueueJob.REBUILD_HIERARCHY_ALL_BY_SPACE:
await this.rebuildAllBySpace();
break;
case QueueJob.REBUILD_HIERARCHY_SPACE:
if (!job.data.spaceId) {
throw new Error('spaceId is required for space rebuild');
}
await this.rebuildBySpace(job.data.spaceId);
break;
}
} catch (err) {
throw err;
}
}
private async rebuildAll(): Promise<void> {
await executeTx(this.db, async (trx) => {
const locked = await this.pageHierarchyRepo.tryAcquireGlobalLock(trx);
if (!locked) {
this.logger.debug(
'Rebuild all skipped - another process holds the lock',
);
return;
}
this.logger.debug('Rebuilding hierarchy for all pages');
const count = await this.pageHierarchyRepo.rebuildAll(trx);
this.logger.debug(`Rebuilt hierarchy for all pages (${count} entries)`);
});
}
private async rebuildBySpace(spaceId: string): Promise<void> {
await executeTx(this.db, async (trx) => {
const locked = await this.pageHierarchyRepo.tryAcquireSpaceLock(
spaceId,
trx,
);
if (!locked) {
this.logger.debug(
`Rebuild for space ${spaceId} skipped - another process holds the lock`,
);
return;
}
this.logger.debug(`Rebuilding hierarchy for space ${spaceId}`);
const count = await this.pageHierarchyRepo.rebuildBySpace(spaceId, trx);
this.logger.debug(
`Rebuilt hierarchy for space ${spaceId} (${count} entries)`,
);
});
}
private async rebuildAllBySpace(): Promise<void> {
let lastId: string | null = null;
const BATCH_SIZE = 100;
let totalSpaces = 0;
this.logger.debug('Starting hierarchy rebuild for all spaces');
while (true) {
const spaces = await this.db
.selectFrom('spaces')
.select('id')
.$if(Boolean(lastId), (qb) => qb.where('id', '>', lastId!))
.orderBy('id', 'asc')
.limit(BATCH_SIZE)
.execute();
if (spaces.length === 0) break;
for (const space of spaces) {
await this.rebuildBySpace(space.id);
totalSpaces++;
}
lastId = spaces[spaces.length - 1].id;
this.logger.debug(`Rebuilt hierarchy for ${totalSpaces} spaces...`);
}
this.logger.debug(`Completed hierarchy rebuild for ${totalSpaces} spaces`);
}
@OnWorkerEvent('active')
onActive(job: Job) {
this.logger.debug(`Processing ${job.name} job`);
}
@OnWorkerEvent('failed')
onError(job: Job) {
this.logger.error(
`Error processing ${job.name} job. Reason: ${job.failedReason}`,
);
}
@OnWorkerEvent('completed')
onCompleted(job: Job) {
this.logger.debug(`Completed ${job.name} job`);
}
async onModuleDestroy(): Promise<void> {
if (this.worker) {
await this.worker.close();
}
}
}
@@ -4,7 +4,6 @@ import { EnvironmentService } from '../environment/environment.service';
import { createRetryStrategy, parseRedisUrl } from '../../common/helpers';
import { QueueName } from './constants';
import { BacklinksProcessor } from './processors/backlinks.processor';
import { HierarchyProcessor } from './processors/hierarchy.processor';
@Global()
@Module({
@@ -74,22 +73,8 @@ import { HierarchyProcessor } from './processors/hierarchy.processor';
attempts: 1,
},
}),
BullModule.registerQueue({
name: QueueName.HIERARCHY_QUEUE,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: {
count: 50,
},
attempts: 3,
backoff: {
type: 'exponential',
delay: 20 * 1000,
},
},
}),
],
exports: [BullModule],
providers: [BacklinksProcessor, HierarchyProcessor],
providers: [BacklinksProcessor],
})
export class QueueModule {}