feat(base): add BASE_QUERY_CACHE_TRACE flag for duckdb operation logging

This commit is contained in:
Philipinho
2026-04-23 13:37:25 +01:00
parent b2ed8f9936
commit 2d9e060d9e
6 changed files with 110 additions and 7 deletions
@@ -166,6 +166,18 @@ export class BaseQueryCacheService
},
});
if (this.env?.getBaseQueryCacheTrace?.() ?? false) {
console.log(
'[cache-trace]',
JSON.stringify({
phase: 'query.sql',
baseId: baseId.slice(0, 8),
sql,
params,
}),
);
}
const prepared = await collection.connection.prepare(sql);
for (let i = 0; i < params.length; i++) {
const p = params[i];
@@ -296,6 +308,17 @@ export class BaseQueryCacheService
*/
async applyChange(env: ChangeEnvelope): Promise<void> {
const collection = this.collections.get(env.baseId);
if (this.env?.getBaseQueryCacheTrace?.() ?? false) {
console.log(
'[cache-trace]',
JSON.stringify({
phase: 'pubsub.apply',
baseId: env.baseId.slice(0, 8),
kind: env.kind,
resident: !!collection,
}),
);
}
if (!collection) return;
try {
@@ -146,6 +146,21 @@ export class BaseQueryCacheWriteConsumer {
envelope: ChangeEnvelope,
): Promise<void> {
const channel = `base-query-cache:changes:${baseId}`;
if (this.configProvider.config.trace) {
console.log(
'[cache-trace]',
JSON.stringify({
phase: 'pubsub.publish',
baseId,
kind: envelope.kind,
// Include the row id or similar short discriminator where meaningful,
// but don't dump the full envelope — it can be large (row-upsert ships
// the whole row).
...('rowId' in envelope ? { rowId: envelope.rowId } : {}),
...('rowIds' in envelope ? { rowCount: envelope.rowIds.length } : {}),
}),
);
}
try {
await this.redis.publish(channel, JSON.stringify(envelope));
} catch (err) {
@@ -26,15 +26,34 @@ export class BaseQueryRouter {
async decide(args: RouteDecideArgs): Promise<RouteDecision> {
const { enabled, minRows } = this.configProvider.config;
if (!enabled) return 'postgres';
const trace = this.configProvider.config.trace ?? false;
const emit = (route: RouteDecision, reason: string): RouteDecision => {
if (trace) {
console.log(
'[cache-trace]',
JSON.stringify({
phase: 'router.decision',
baseId: args.baseId,
route,
reason,
}),
);
}
return route;
};
if (!enabled) return emit('postgres', 'flag disabled');
const hasFilter = !!args.filter;
const hasSorts = !!args.sorts && args.sorts.length > 0;
const hasSearch = !!args.search;
if (!hasFilter && !hasSorts && !hasSearch) return 'postgres';
if (!hasFilter && !hasSorts && !hasSearch) {
return emit('postgres', 'no filter/sort/search');
}
// v1: any search stays on Postgres — loader doesn't populate search_text yet.
if (hasSearch) return 'postgres';
if (hasSearch) return emit('postgres', 'search requires postgres');
// Fast path: if the collection is already resident, read the cached
// row count instead of running a Postgres COUNT on every request.
@@ -52,8 +71,16 @@ export class BaseQueryRouter {
}),
);
}
if (resident.rowCount < minRows) return 'postgres';
return 'cache';
if (resident.rowCount < minRows) {
return emit(
'postgres',
`rowCount=${resident.rowCount} below MIN_ROWS=${minRows}`,
);
}
return emit(
'cache',
`qualified: rowCount=${resident.rowCount}, hasFilter=${hasFilter}, hasSort=${hasSorts}`,
);
}
const debug = this.env?.getBaseQueryCacheDebug() ?? false;
@@ -73,8 +100,13 @@ export class BaseQueryRouter {
}),
);
}
if (count < minRows) return 'postgres';
if (count < minRows) {
return emit('postgres', `rowCount=${count} below MIN_ROWS=${minRows}`);
}
return 'cache';
return emit(
'cache',
`qualified: rowCount=${count}, hasFilter=${hasFilter}, hasSort=${hasSorts}`,
);
}
}
@@ -50,6 +50,17 @@ export class CollectionLoader {
// server-side via the base_cell_* helpers; DuckDB streams typed
// columns over COPY BINARY into its vectorized insert path.
const sql = buildLoaderSql(specs, baseId, workspaceId);
if (this.config.config.trace) {
console.log(
'[cache-trace]',
JSON.stringify({
phase: 'loader.sql',
baseId,
length: sql.length,
sql,
}),
);
}
await connection.run(sql);
// Release the PG connection held by the ATTACH — we're done with
@@ -60,9 +71,21 @@ export class CollectionLoader {
for (const spec of specs) {
if (!spec.indexable) continue;
const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_');
const tIdx = this.config.config.trace ? Date.now() : 0;
await connection.run(
`CREATE INDEX ${quoteIdent(`idx_${safe}`)} ON rows (${quoteIdent(spec.column)})`,
);
if (this.config.config.trace) {
console.log(
'[cache-trace]',
JSON.stringify({
phase: 'loader.index',
baseId,
column: spec.column,
ms: Date.now() - tIdx,
}),
);
}
}
const countResult = await connection.runAndReadAll(
@@ -8,6 +8,7 @@ export type QueryCacheConfig = {
warmTopN: number;
memoryLimit: string;
threads: number;
trace: boolean;
};
@Injectable()
@@ -21,6 +22,7 @@ export class QueryCacheConfigProvider {
warmTopN: env.getBaseQueryCacheWarmTopN(),
memoryLimit: env.getBaseQueryCacheMemoryLimit(),
threads: env.getBaseQueryCacheThreads(),
trace: env.getBaseQueryCacheTrace(),
};
}
}
@@ -344,6 +344,14 @@ export class EnvironmentService {
);
}
getBaseQueryCacheTrace(): boolean {
return (
this.configService
.get<string>('BASE_QUERY_CACHE_TRACE', 'false')
.toLowerCase() === 'true'
);
}
getBaseQueryCacheMemoryLimit(): string {
// Per-DuckDB-instance memory ceiling. DuckDB accepts human-readable sizes:
// '32MB', '128MB', '1GB'. Default keeps a single instance from