mirror of
https://github.com/docmost/docmost.git
synced 2026-05-07 06:23:06 +08:00
perf(server): skip per-request row count when collection is resident
This commit is contained in:
@@ -279,6 +279,13 @@ export class BaseQueryCacheService
|
|||||||
return this.collections.size;
|
return this.collections.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Production-facing fast path for the router: returns the resident
|
||||||
|
// collection without triggering a load. Used to avoid a per-request
|
||||||
|
// Postgres COUNT when the cached rowCount already answers the question.
|
||||||
|
peek(baseId: string): LoadedCollection | undefined {
|
||||||
|
return this.collections.get(baseId);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Apply a change envelope received from Redis pub/sub to the local
|
* Apply a change envelope received from Redis pub/sub to the local
|
||||||
* collection (if any). Rows that target bases not resident on this node
|
* collection (if any). Rows that target bases not resident on this node
|
||||||
@@ -300,12 +307,15 @@ export class BaseQueryCacheService
|
|||||||
return;
|
return;
|
||||||
case 'row-upsert':
|
case 'row-upsert':
|
||||||
await this.upsertRow(collection, env.row);
|
await this.upsertRow(collection, env.row);
|
||||||
|
await this.refreshRowCount(collection);
|
||||||
return;
|
return;
|
||||||
case 'row-delete':
|
case 'row-delete':
|
||||||
await this.deleteRow(collection, env.rowId);
|
await this.deleteRow(collection, env.rowId);
|
||||||
|
await this.refreshRowCount(collection);
|
||||||
return;
|
return;
|
||||||
case 'rows-delete':
|
case 'rows-delete':
|
||||||
for (const id of env.rowIds) await this.deleteRow(collection, id);
|
for (const id of env.rowIds) await this.deleteRow(collection, id);
|
||||||
|
await this.refreshRowCount(collection);
|
||||||
return;
|
return;
|
||||||
case 'row-reorder':
|
case 'row-reorder':
|
||||||
await this.updatePosition(collection, env.rowId, env.position);
|
await this.updatePosition(collection, env.rowId, env.position);
|
||||||
@@ -321,6 +331,19 @@ export class BaseQueryCacheService
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async refreshRowCount(collection: LoadedCollection): Promise<void> {
|
||||||
|
try {
|
||||||
|
const res = await collection.connection.runAndReadAll(
|
||||||
|
'SELECT count(*) AS c FROM rows',
|
||||||
|
);
|
||||||
|
const row = res.getRowObjects()[0] as { c: bigint | number };
|
||||||
|
collection.rowCount = Number(row.c);
|
||||||
|
} catch {
|
||||||
|
// swallow — stale rowCount drifts at most by the size of the burst; the
|
||||||
|
// next reload-from-Postgres or pubsub event corrects it.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private async upsertRow(
|
private async upsertRow(
|
||||||
collection: LoadedCollection,
|
collection: LoadedCollection,
|
||||||
row: Record<string, unknown>,
|
row: Record<string, unknown>,
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { BaseQueryRouter } from './base-query-router';
|
import { BaseQueryRouter } from './base-query-router';
|
||||||
import { QueryCacheConfigProvider } from './query-cache.config';
|
import { QueryCacheConfigProvider } from './query-cache.config';
|
||||||
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
|
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
|
||||||
|
import { BaseQueryCacheService } from './base-query-cache.service';
|
||||||
import { FilterNode, SearchSpec, SortSpec } from '../engine';
|
import { FilterNode, SearchSpec, SortSpec } from '../engine';
|
||||||
|
|
||||||
type FakeConfig = { enabled: boolean; minRows: number };
|
type FakeConfig = { enabled: boolean; minRows: number };
|
||||||
@@ -21,8 +22,13 @@ function makeRouter(
|
|||||||
const countSpy = jest.fn().mockResolvedValue(count);
|
const countSpy = jest.fn().mockResolvedValue(count);
|
||||||
const baseRowRepo = { countActiveRows: countSpy } as unknown as BaseRowRepo;
|
const baseRowRepo = { countActiveRows: countSpy } as unknown as BaseRowRepo;
|
||||||
|
|
||||||
|
// Default fake: always miss, so `decide` falls through to countActiveRows.
|
||||||
|
const fakeCacheService = {
|
||||||
|
peek: () => undefined,
|
||||||
|
} as unknown as BaseQueryCacheService;
|
||||||
|
|
||||||
return {
|
return {
|
||||||
router: new BaseQueryRouter(configProvider, baseRowRepo),
|
router: new BaseQueryRouter(configProvider, baseRowRepo, fakeCacheService),
|
||||||
countSpy,
|
countSpy,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -98,4 +104,56 @@ describe('BaseQueryRouter.decide', () => {
|
|||||||
const decision = await router.decide({ ...baseArgs, search: trgmSearch });
|
const decision = await router.decide({ ...baseArgs, search: trgmSearch });
|
||||||
expect(decision).toBe('postgres');
|
expect(decision).toBe('postgres');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('uses cached row count from resident collection (no Postgres call)', async () => {
|
||||||
|
const countSpy = jest.fn().mockResolvedValue(999999); // shouldn't be called
|
||||||
|
const cacheService = {
|
||||||
|
peek: jest.fn().mockReturnValue({ baseId: 'base-1', rowCount: 50_000 }),
|
||||||
|
} as unknown as BaseQueryCacheService;
|
||||||
|
const router = new BaseQueryRouter(
|
||||||
|
{
|
||||||
|
config: {
|
||||||
|
enabled: true,
|
||||||
|
minRows: 25_000,
|
||||||
|
maxCollections: 10,
|
||||||
|
warmTopN: 0,
|
||||||
|
},
|
||||||
|
} as unknown as QueryCacheConfigProvider,
|
||||||
|
{ countActiveRows: countSpy } as unknown as BaseRowRepo,
|
||||||
|
cacheService,
|
||||||
|
);
|
||||||
|
const decision = await router.decide({
|
||||||
|
...baseArgs,
|
||||||
|
sorts,
|
||||||
|
});
|
||||||
|
expect(decision).toBe('cache');
|
||||||
|
expect((cacheService.peek as jest.Mock)).toHaveBeenCalledWith('base-1');
|
||||||
|
expect(countSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('falls back to Postgres count when collection is not resident', async () => {
|
||||||
|
const countSpy = jest.fn().mockResolvedValue(30_000);
|
||||||
|
const cacheService = {
|
||||||
|
peek: jest.fn().mockReturnValue(undefined),
|
||||||
|
} as unknown as BaseQueryCacheService;
|
||||||
|
const router = new BaseQueryRouter(
|
||||||
|
{
|
||||||
|
config: {
|
||||||
|
enabled: true,
|
||||||
|
minRows: 25_000,
|
||||||
|
maxCollections: 10,
|
||||||
|
warmTopN: 0,
|
||||||
|
},
|
||||||
|
} as unknown as QueryCacheConfigProvider,
|
||||||
|
{ countActiveRows: countSpy } as unknown as BaseRowRepo,
|
||||||
|
cacheService,
|
||||||
|
);
|
||||||
|
const decision = await router.decide({
|
||||||
|
...baseArgs,
|
||||||
|
sorts,
|
||||||
|
});
|
||||||
|
expect(decision).toBe('cache');
|
||||||
|
expect((cacheService.peek as jest.Mock)).toHaveBeenCalledWith('base-1');
|
||||||
|
expect(countSpy).toHaveBeenCalledWith('base-1', { workspaceId: 'ws-1' });
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { QueryCacheConfigProvider } from './query-cache.config';
|
|||||||
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
|
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
|
||||||
import type { FilterNode, SearchSpec, SortSpec } from '../engine';
|
import type { FilterNode, SearchSpec, SortSpec } from '../engine';
|
||||||
import { EnvironmentService } from '../../../integrations/environment/environment.service';
|
import { EnvironmentService } from '../../../integrations/environment/environment.service';
|
||||||
|
import { BaseQueryCacheService } from './base-query-cache.service';
|
||||||
|
|
||||||
export type RouteDecision = 'postgres' | 'cache';
|
export type RouteDecision = 'postgres' | 'cache';
|
||||||
|
|
||||||
@@ -19,6 +20,7 @@ export class BaseQueryRouter {
|
|||||||
constructor(
|
constructor(
|
||||||
private readonly configProvider: QueryCacheConfigProvider,
|
private readonly configProvider: QueryCacheConfigProvider,
|
||||||
private readonly baseRowRepo: BaseRowRepo,
|
private readonly baseRowRepo: BaseRowRepo,
|
||||||
|
private readonly cacheService: BaseQueryCacheService,
|
||||||
@Optional() private readonly env: EnvironmentService | null = null,
|
@Optional() private readonly env: EnvironmentService | null = null,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
@@ -34,6 +36,26 @@ export class BaseQueryRouter {
|
|||||||
// v1: any search stays on Postgres — loader doesn't populate search_text yet.
|
// v1: any search stays on Postgres — loader doesn't populate search_text yet.
|
||||||
if (hasSearch) return 'postgres';
|
if (hasSearch) return 'postgres';
|
||||||
|
|
||||||
|
// Fast path: if the collection is already resident, read the cached
|
||||||
|
// row count instead of running a Postgres COUNT on every request.
|
||||||
|
const resident = this.cacheService.peek(args.baseId);
|
||||||
|
if (resident) {
|
||||||
|
const debug = this.env?.getBaseQueryCacheDebug() ?? false;
|
||||||
|
if (debug) {
|
||||||
|
console.log(
|
||||||
|
'[cache-perf]',
|
||||||
|
JSON.stringify({
|
||||||
|
phase: 'router.residentCount',
|
||||||
|
baseId: args.baseId.slice(0, 8),
|
||||||
|
count: resident.rowCount,
|
||||||
|
minRows,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (resident.rowCount < minRows) return 'postgres';
|
||||||
|
return 'cache';
|
||||||
|
}
|
||||||
|
|
||||||
const debug = this.env?.getBaseQueryCacheDebug() ?? false;
|
const debug = this.env?.getBaseQueryCacheDebug() ?? false;
|
||||||
const tCount = debug ? Date.now() : 0;
|
const tCount = debug ? Date.now() : 0;
|
||||||
const count = await this.baseRowRepo.countActiveRows(args.baseId, {
|
const count = await this.baseRowRepo.countActiveRows(args.baseId, {
|
||||||
|
|||||||
@@ -117,6 +117,13 @@ export class CollectionLoader {
|
|||||||
`Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`,
|
`Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const countResult = await connection.runAndReadAll(
|
||||||
|
'SELECT count(*) AS c FROM rows',
|
||||||
|
);
|
||||||
|
const cachedRowCount = Number(
|
||||||
|
(countResult.getRowObjects()[0] as { c: bigint | number }).c,
|
||||||
|
);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
baseId,
|
baseId,
|
||||||
schemaVersion,
|
schemaVersion,
|
||||||
@@ -124,6 +131,7 @@ export class CollectionLoader {
|
|||||||
instance,
|
instance,
|
||||||
connection,
|
connection,
|
||||||
lastAccessedAt: Date.now(),
|
lastAccessedAt: Date.now(),
|
||||||
|
rowCount: cachedRowCount,
|
||||||
};
|
};
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (appender) {
|
if (appender) {
|
||||||
|
|||||||
@@ -27,6 +27,8 @@ export type LoadedCollection = {
|
|||||||
instance: DuckDBInstance;
|
instance: DuckDBInstance;
|
||||||
connection: DuckDBConnection;
|
connection: DuckDBConnection;
|
||||||
lastAccessedAt: number;
|
lastAccessedAt: number;
|
||||||
|
// cached; set by loader, maintained by applyChange
|
||||||
|
rowCount: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type ChangeEnvelope =
|
export type ChangeEnvelope =
|
||||||
|
|||||||
Reference in New Issue
Block a user