feat(server): route large base list queries through the duckdb cache

This commit is contained in:
Philipinho
2026-04-19 21:46:27 +01:00
parent 45000bbd8b
commit cf6b48cd58
3 changed files with 166 additions and 6 deletions
@@ -0,0 +1,101 @@
import { BaseQueryRouter } from './base-query-router';
import { QueryCacheConfigProvider } from './query-cache.config';
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
import { FilterNode, SearchSpec, SortSpec } from '../engine';
type FakeConfig = { enabled: boolean; minRows: number };
function makeRouter(
cfg: FakeConfig,
count: number,
): { router: BaseQueryRouter; countSpy: jest.Mock } {
const configProvider = {
config: {
enabled: cfg.enabled,
minRows: cfg.minRows,
maxCollections: 10,
warmTopN: 0,
},
} as unknown as QueryCacheConfigProvider;
const countSpy = jest.fn().mockResolvedValue(count);
const baseRowRepo = { countActiveRows: countSpy } as unknown as BaseRowRepo;
return {
router: new BaseQueryRouter(configProvider, baseRowRepo),
countSpy,
};
}
const filter: FilterNode = {
op: 'and',
children: [
{
propertyId: 'p1',
op: 'eq',
value: 'foo',
},
],
};
const sorts: SortSpec[] = [{ propertyId: 'p1', direction: 'asc' }];
const trgmSearch: SearchSpec = { query: 'hello', mode: 'trgm' };
const ftsSearch: SearchSpec = { query: 'hello', mode: 'fts' };
const baseArgs = {
baseId: 'base-1',
workspaceId: 'ws-1',
};
describe('BaseQueryRouter.decide', () => {
it('returns postgres when flag is off', async () => {
const { router, countSpy } = makeRouter(
{ enabled: false, minRows: 10 },
1000,
);
const decision = await router.decide({ ...baseArgs, filter });
expect(decision).toBe('postgres');
expect(countSpy).not.toHaveBeenCalled();
});
it('returns postgres when row count < minRows', async () => {
const { router } = makeRouter({ enabled: true, minRows: 1000 }, 500);
const decision = await router.decide({ ...baseArgs, filter });
expect(decision).toBe('postgres');
});
it('returns postgres when query has no filter/sort/search', async () => {
const { router, countSpy } = makeRouter(
{ enabled: true, minRows: 10 },
10000,
);
const decision = await router.decide({ ...baseArgs });
expect(decision).toBe('postgres');
expect(countSpy).not.toHaveBeenCalled();
});
it('returns postgres when search.mode === "fts" even for large base', async () => {
const { router } = makeRouter({ enabled: true, minRows: 10 }, 10000);
const decision = await router.decide({ ...baseArgs, search: ftsSearch });
expect(decision).toBe('postgres');
});
it('returns cache when flag on + rows >= minRows + has filter', async () => {
const { router } = makeRouter({ enabled: true, minRows: 1000 }, 1000);
const decision = await router.decide({ ...baseArgs, filter });
expect(decision).toBe('cache');
});
it('returns cache when flag on + rows >= minRows + has sort', async () => {
const { router } = makeRouter({ enabled: true, minRows: 1000 }, 5000);
const decision = await router.decide({ ...baseArgs, sorts });
expect(decision).toBe('cache');
});
it('returns postgres when flag on + rows >= minRows + has trgm search (v1 gates search to postgres)', async () => {
const { router } = makeRouter({ enabled: true, minRows: 10 }, 10000);
const decision = await router.decide({ ...baseArgs, search: trgmSearch });
expect(decision).toBe('postgres');
});
});
@@ -1,16 +1,43 @@
import { Injectable } from '@nestjs/common';
import { QueryCacheConfigProvider } from './query-cache.config';
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
import type { FilterNode, SearchSpec, SortSpec } from '../engine';
export type RouteDecision = 'postgres' | 'cache';
export type RouteDecideArgs = {
baseId: string;
workspaceId: string;
filter?: FilterNode;
sorts?: SortSpec[];
search?: SearchSpec;
};
@Injectable()
export class BaseQueryRouter {
constructor(private readonly configProvider: QueryCacheConfigProvider) {}
constructor(
private readonly configProvider: QueryCacheConfigProvider,
private readonly baseRowRepo: BaseRowRepo,
) {}
// Stubbed: routes always to postgres in this commit so the existing
// behavior is preserved. Real decision logic is added in task 6.
decide(_args: unknown): RouteDecision {
if (!this.configProvider.config.enabled) return 'postgres';
return 'postgres';
async decide(args: RouteDecideArgs): Promise<RouteDecision> {
const { enabled, minRows } = this.configProvider.config;
if (!enabled) return 'postgres';
const hasFilter = !!args.filter;
const hasSorts = !!args.sorts && args.sorts.length > 0;
const hasSearch = !!args.search;
if (!hasFilter && !hasSorts && !hasSearch) return 'postgres';
// v1: any search stays on Postgres. Trgm search also stays on Postgres
// until the loader populates `search_text`; re-evaluate after that lands.
if (args.search) return 'postgres';
const count = await this.baseRowRepo.countActiveRows(args.baseId, {
workspaceId: args.workspaceId,
});
if (count < minRows) return 'postgres';
return 'cache';
}
}
@@ -1,6 +1,7 @@
import {
BadRequestException,
Injectable,
Logger,
NotFoundException,
} from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
@@ -9,6 +10,8 @@ import { KyselyDB } from '@docmost/db/types/kysely.types';
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo';
import { BaseViewRepo } from '@docmost/db/repos/base/base-view.repo';
import { BaseQueryRouter } from '../query-cache/base-query-router';
import { BaseQueryCacheService } from '../query-cache/base-query-cache.service';
import { CreateRowDto } from '../dto/create-row.dto';
import {
UpdateRowDto,
@@ -44,12 +47,16 @@ import {
@Injectable()
export class BaseRowService {
private readonly logger = new Logger(BaseRowService.name);
constructor(
@InjectKysely() private readonly db: KyselyDB,
private readonly baseRowRepo: BaseRowRepo,
private readonly basePropertyRepo: BasePropertyRepo,
private readonly baseViewRepo: BaseViewRepo,
private readonly eventEmitter: EventEmitter2,
private readonly queryRouter: BaseQueryRouter,
private readonly queryCache: BaseQueryCacheService,
) {}
async create(userId: string, workspaceId: string, dto: CreateRowDto) {
@@ -202,6 +209,31 @@ export class BaseRowService {
direction: s.direction,
}));
const decision = await this.queryRouter.decide({
baseId: dto.baseId,
workspaceId,
filter,
sorts,
search,
});
if (decision === 'cache') {
try {
return await this.queryCache.list(dto.baseId, workspaceId, {
filter,
sorts,
search,
schema,
pagination,
});
} catch (err) {
this.logger.warn(
`Cache list failed for base ${dto.baseId}, falling back to Postgres`,
err as Error,
);
}
}
return this.baseRowRepo.list({
baseId: dto.baseId,
workspaceId,