refactor(base): single duckdb instance with per-base attached databases

This commit is contained in:
Philipinho
2026-04-23 16:40:14 +01:00
parent 4437dcbb62
commit 38cd94b2d7
6 changed files with 325 additions and 682 deletions
@@ -27,6 +27,7 @@ import {
import { QueryCacheConfigProvider } from './query-cache.config';
import { CollectionLoader } from './collection-loader';
import { buildDuckDbListQuery } from './duckdb-query-builder';
import { DuckDbRuntime } from './duckdb-runtime';
import { BasePropertyType } from '../base.schemas';
import {
ChangeEnvelope,
@@ -55,6 +56,7 @@ export class BaseQueryCacheService
private readonly configProvider: QueryCacheConfigProvider,
private readonly baseRepo: BaseRepo,
private readonly collectionLoader: CollectionLoader,
private readonly runtime: DuckDbRuntime,
@Optional() private readonly redisService: RedisService | null = null,
@Optional() private readonly env: EnvironmentService | null = null,
) {}
@@ -62,8 +64,14 @@ export class BaseQueryCacheService
async onApplicationBootstrap(): Promise<void> {
const { enabled, warmTopN } = this.configProvider.config;
if (!enabled) return;
if (!this.runtime.isReady()) {
this.logger.warn('runtime not ready; skipping warm-up');
return;
}
const redis = this.tryGetRedisClient();
if (!redis) return;
try {
const ids = await redis.zrevrange(
'base-query-cache:recent',
@@ -89,41 +97,10 @@ export class BaseQueryCacheService
}
}
private tryGetRedisClient(): Redis | null {
if (!this.redisService) return null;
try {
return this.redisService.getOrNil();
} catch {
return null;
}
}
private recordAccess(baseId: string): void {
if (!this.configProvider.config.enabled) return;
const redis = this.tryGetRedisClient();
if (!redis) return;
const nowMs = Date.now();
const maxKeep = this.configProvider.config.maxCollections * 10;
void (async () => {
try {
await redis.zadd('base-query-cache:recent', nowMs, baseId);
await redis.zremrangebyrank(
'base-query-cache:recent',
0,
-(maxKeep + 1),
);
} catch (err) {
this.logger.debug(
`recordAccess failed for ${baseId}: ${(err as Error).message}`,
);
}
})();
}
async onModuleDestroy(): Promise<void> {
for (const [, collection] of this.collections) {
this.closeCollection(collection);
}
// The runtime owns the instance/connection lifecycle; we just clear
// our metadata. DETACH is a no-op during shutdown because the instance
// is closing anyway.
this.collections.clear();
}
@@ -133,6 +110,7 @@ export class BaseQueryCacheService
opts: CacheListOpts,
): Promise<CursorPaginationResult<BaseRow>> {
const debug = this.env?.getBaseQueryCacheDebug() ?? false;
const trace = this.env?.getBaseQueryCacheTrace?.() ?? false;
const tStart = debug ? Date.now() : 0;
const tEnsure = debug ? Date.now() : 0;
@@ -143,9 +121,7 @@ export class BaseQueryCacheService
opts.sorts && opts.sorts.length > 0
? buildSorts(opts.sorts, opts.schema)
: [];
const cursor = makeCursor(sortBuilds, CURSOR_TAIL_KEYS);
const sortFieldKeys = sortBuilds.map((s) => s.key);
const allFieldKeys = [...sortFieldKeys, 'position', 'id'];
@@ -164,21 +140,25 @@ export class BaseQueryCacheService
limit: opts.pagination.limit,
afterKeys: afterKeys as any,
},
schema: collection.schema,
});
if (this.env?.getBaseQueryCacheTrace?.() ?? false) {
if (trace) {
console.log(
'[cache-trace]',
JSON.stringify({
phase: 'query.sql',
baseId: baseId.slice(0, 8),
schema: collection.schema,
sql,
params,
}),
);
}
const prepared = await collection.connection.prepare(sql);
const tExec = debug ? Date.now() : 0;
const duckRows = await this.runtime.withReader(async (conn) => {
const prepared = await conn.prepare(sql);
for (let i = 0; i < params.length; i++) {
const p = params[i];
const oneBased = i + 1;
@@ -196,10 +176,9 @@ export class BaseQueryCacheService
prepared.bindVarchar(oneBased, JSON.stringify(p));
}
}
const tExec = debug ? Date.now() : 0;
const reader = await prepared.runAndReadAll();
const duckRows = reader.getRowObjectsJS();
return reader.getRowObjectsJS();
});
const execMs = debug ? Date.now() - tExec : 0;
const hasNextPage = duckRows.length > opts.pagination.limit;
@@ -231,12 +210,9 @@ export class BaseQueryCacheService
const endRow = duckRows[duckRows.length - 1];
const startRow = duckRows[0];
const encodeFromRow = (raw: Record<string, unknown>): string => {
const entries: Array<[string, unknown]> = [];
for (const sb of sortBuilds) {
entries.push([sb.key, raw[sb.key]]);
}
for (const sb of sortBuilds) entries.push([sb.key, raw[sb.key]]);
entries.push(['position', raw.position]);
entries.push(['id', raw.id]);
return cursor.encodeCursor(entries);
@@ -276,13 +252,10 @@ export class BaseQueryCacheService
async invalidate(baseId: string): Promise<void> {
const collection = this.collections.get(baseId);
if (!collection) return;
this.closeCollection(collection);
await this.runtime.detachBase(collection.schema);
this.collections.delete(baseId);
}
// Test-only introspection of the resident cache. Used by the LRU eviction
// integration spec to assert which collections are currently loaded without
// reaching into the private `collections` map.
isResident(baseId: string): boolean {
return this.collections.has(baseId);
}
@@ -291,48 +264,38 @@ export class BaseQueryCacheService
return this.collections.size;
}
// Production-facing fast path for the router: returns the resident
// collection without triggering a load. Used to avoid a per-request
// Postgres COUNT when the cached rowCount already answers the question.
peek(baseId: string): LoadedCollection | undefined {
return this.collections.get(baseId);
}
// Returns the memory footprint of every currently resident collection.
residencySnapshot(): Array<{
baseId: string;
schema: string;
rows: number;
heapMb: number;
spilledMb: number;
approxMb: number;
}> {
const out: Array<{
baseId: string;
schema: string;
rows: number;
heapMb: number;
spilledMb: number;
approxMb: number;
}> = [];
for (const [baseId, c] of this.collections) {
out.push({
baseId,
schema: c.schema,
rows: c.rowCount,
heapMb: +(c.heapBytes / (1024 * 1024)).toFixed(1),
spilledMb: +(c.spilledBytes / (1024 * 1024)).toFixed(1),
approxMb: +(c.approxBytes / (1024 * 1024)).toFixed(1),
});
}
return out;
}
/*
* Apply a change envelope received from Redis pub/sub to the local
* collection (if any). Rows that target bases not resident on this node
* are ignored — the next `list` call will load them fresh from Postgres.
* If any patch step throws (e.g. schema drift between this node and the
* publisher) we eagerly invalidate so the next `list` rebuilds cleanly
* rather than serving partial state.
*/
async applyChange(env: ChangeEnvelope): Promise<void> {
const trace = this.env?.getBaseQueryCacheTrace?.() ?? false;
const collection = this.collections.get(env.baseId);
if (this.env?.getBaseQueryCacheTrace?.() ?? false) {
if (trace) {
console.log(
'[cache-trace]',
JSON.stringify({
@@ -343,6 +306,7 @@ export class BaseQueryCacheService
}),
);
}
if (!collection) return;
try {
@@ -378,16 +342,95 @@ export class BaseQueryCacheService
}
}
private async refreshRowCount(collection: LoadedCollection): Promise<void> {
try {
const res = await collection.connection.runAndReadAll(
'SELECT count(*) AS c FROM rows',
private async ensureLoaded(
baseId: string,
workspaceId: string,
): Promise<LoadedCollection> {
const debug = this.env?.getBaseQueryCacheDebug() ?? false;
const existing = this.collections.get(baseId);
const tFind = debug ? Date.now() : 0;
const base = await this.baseRepo.findById(baseId);
const findMs = debug ? Date.now() - tFind : 0;
if (!base) throw new Error(`Base ${baseId} not found`);
const freshVersion = (base as any).schemaVersion ?? 1;
if (existing && existing.schemaVersion === freshVersion) {
existing.lastAccessedAt = Date.now();
this.recordAccess(baseId);
if (debug) {
console.log(
'[cache-perf]',
JSON.stringify({
phase: 'ensureLoaded.hit',
baseId: baseId.slice(0, 8),
findMs,
}),
);
const row = res.getRowObjects()[0] as { c: bigint | number };
collection.rowCount = Number(row.c);
} catch {
// swallow — stale rowCount drifts at most by the size of the burst; the
// next reload-from-Postgres or pubsub event corrects it.
}
return existing;
}
if (existing) {
await this.runtime.detachBase(existing.schema);
this.collections.delete(baseId);
}
const inFlight = this.inFlightLoads.get(baseId);
if (inFlight) {
const loaded = await inFlight;
this.recordAccess(baseId);
return loaded;
}
const tLoad = debug ? Date.now() : 0;
const promise = (async () => {
try {
const { maxCollections } = this.configProvider.config;
if (this.collections.size >= maxCollections) {
await this.evictLru();
}
const loaded = await this.collectionLoader.load(baseId, workspaceId);
this.collections.set(baseId, loaded);
return loaded;
} finally {
this.inFlightLoads.delete(baseId);
}
})();
this.inFlightLoads.set(baseId, promise);
const loaded = await promise;
const loadMs = debug ? Date.now() - tLoad : 0;
this.recordAccess(baseId);
if (debug) {
console.log(
'[cache-perf]',
JSON.stringify({
phase: 'ensureLoaded.miss',
baseId: baseId.slice(0, 8),
findMs,
loadMs,
rows: loaded.rowCount,
approxMb: +(loaded.approxBytes / (1024 * 1024)).toFixed(1),
}),
);
}
return loaded;
}
private async evictLru(): Promise<void> {
let oldestKey: string | null = null;
let oldestTime = Number.POSITIVE_INFINITY;
for (const [key, col] of this.collections) {
if (col.lastAccessedAt < oldestTime) {
oldestTime = col.lastAccessedAt;
oldestKey = key;
}
}
if (oldestKey) {
const col = this.collections.get(oldestKey)!;
await this.runtime.detachBase(col.schema);
this.collections.delete(oldestKey);
this.logger.debug(`Evicted LRU collection ${oldestKey}`);
}
}
@@ -398,9 +441,10 @@ export class BaseQueryCacheService
const specs = collection.columns;
const columnList = specs.map((s) => quoteIdent(s.column)).join(', ');
const placeholders = specs.map(() => '?').join(', ');
const sql = `INSERT OR REPLACE INTO rows (${columnList}) VALUES (${placeholders})`;
const sql = `INSERT OR REPLACE INTO ${collection.schema}.rows (${columnList}) VALUES (${placeholders})`;
const prepared = await collection.connection.prepare(sql);
const writer = this.runtime.getWriter();
const prepared = await writer.prepare(sql);
for (let i = 0; i < specs.length; i++) {
const spec = specs[i];
const oneBased = i + 1;
@@ -440,8 +484,9 @@ export class BaseQueryCacheService
collection: LoadedCollection,
rowId: string,
): Promise<void> {
const prepared = await collection.connection.prepare(
'DELETE FROM rows WHERE id = ?',
const writer = this.runtime.getWriter();
const prepared = await writer.prepare(
`DELETE FROM ${collection.schema}.rows WHERE id = ?`,
);
prepared.bindVarchar(1, rowId);
await prepared.run();
@@ -452,239 +497,133 @@ export class BaseQueryCacheService
rowId: string,
position: string,
): Promise<void> {
const prepared = await collection.connection.prepare(
'UPDATE rows SET position = ? WHERE id = ?',
const writer = this.runtime.getWriter();
const prepared = await writer.prepare(
`UPDATE ${collection.schema}.rows SET position = ? WHERE id = ?`,
);
prepared.bindVarchar(1, position);
prepared.bindVarchar(2, rowId);
await prepared.run();
}
private async ensureLoaded(
baseId: string,
workspaceId: string,
): Promise<LoadedCollection> {
const debug = this.env?.getBaseQueryCacheDebug() ?? false;
// TODO(task-7): remove per-request findById once pub/sub invalidation
// keeps collections in sync with schema bumps.
const existing = this.collections.get(baseId);
const tFind = debug ? Date.now() : 0;
const base = await this.baseRepo.findById(baseId);
const findMs = debug ? Date.now() - tFind : 0;
if (!base) {
throw new Error(`Base ${baseId} not found`);
}
const freshVersion = (base as any).schemaVersion ?? 1;
if (existing && existing.schemaVersion === freshVersion) {
existing.lastAccessedAt = Date.now();
this.recordAccess(baseId);
if (debug) {
console.log(
'[cache-perf]',
JSON.stringify({
phase: 'ensureLoaded.hit',
baseId: baseId.slice(0, 8),
findMs,
}),
);
}
return existing;
}
if (existing) {
this.closeCollection(existing);
this.collections.delete(baseId);
}
const inFlight = this.inFlightLoads.get(baseId);
if (inFlight) {
const loaded = await inFlight;
this.recordAccess(baseId);
return loaded;
}
const tLoad = debug ? Date.now() : 0;
const promise = (async () => {
private async refreshRowCount(collection: LoadedCollection): Promise<void> {
try {
const { maxCollections } = this.configProvider.config;
if (this.collections.size >= maxCollections) {
this.evictLru();
const res = await this.runtime.getWriter().runAndReadAll(
`SELECT count(*) AS c FROM ${collection.schema}.rows`,
);
const row = res.getRowObjects()[0] as { c: bigint | number };
collection.rowCount = Number(row.c);
} catch {
// stale rowCount self-corrects on next reload
}
const loaded = await this.collectionLoader.load(baseId, workspaceId);
this.collections.set(baseId, loaded);
return loaded;
} finally {
this.inFlightLoads.delete(baseId);
}
private recordAccess(baseId: string): void {
if (!this.configProvider.config.enabled) return;
const redis = this.tryGetRedisClient();
if (!redis) return;
const nowMs = Date.now();
const maxKeep = this.configProvider.config.maxCollections * 10;
void (async () => {
try {
await redis.zadd('base-query-cache:recent', nowMs, baseId);
await redis.zremrangebyrank(
'base-query-cache:recent',
0,
-(maxKeep + 1),
);
} catch (err) {
this.logger.debug(
`recordAccess failed for ${baseId}: ${(err as Error).message}`,
);
}
})();
this.inFlightLoads.set(baseId, promise);
const loaded = await promise;
const loadMs = debug ? Date.now() - tLoad : 0;
this.recordAccess(baseId);
if (debug) {
console.log(
'[cache-perf]',
JSON.stringify({
phase: 'ensureLoaded.miss',
baseId: baseId.slice(0, 8),
findMs,
loadMs,
rows: loaded.rowCount,
heapMb: +(loaded.heapBytes / (1024 * 1024)).toFixed(1),
spilledMb: +(loaded.spilledBytes / (1024 * 1024)).toFixed(1),
}),
);
}
return loaded;
}
private evictLru(): void {
let oldestKey: string | null = null;
let oldestTime = Number.POSITIVE_INFINITY;
for (const [key, col] of this.collections) {
if (col.lastAccessedAt < oldestTime) {
oldestTime = col.lastAccessedAt;
oldestKey = key;
}
}
if (oldestKey) {
const col = this.collections.get(oldestKey)!;
this.closeCollection(col);
this.collections.delete(oldestKey);
this.logger.debug(`Evicted LRU collection ${oldestKey}`);
}
}
private closeCollection(collection: LoadedCollection): void {
private tryGetRedisClient(): Redis | null {
if (!this.redisService) return null;
try {
collection.connection.closeSync();
} catch (err) {
this.logger.warn(`Failed to close connection: ${(err as Error).message}`);
}
try {
collection.instance.closeSync();
} catch (err) {
this.logger.warn(`Failed to close instance: ${(err as Error).message}`);
}
}
}
// Convert a DuckDB row object back into the BaseRow JSON shape. The builder
// projects one column per user property; typed columns (DOUBLE, BOOLEAN,
// TIMESTAMPTZ) round-trip as JS primitives / Date objects. We reconstruct
// `cells` directly from the per-property columns so the JSON payload matches
// what Postgres returns.
function shapeBaseRow(
raw: Record<string, unknown>,
specs: ColumnSpec[],
_sortBuilds: SortBuild[],
): BaseRow {
const cells: Record<string, unknown> = {};
for (const spec of specs) {
if (!spec.property) continue; // system columns handled below
const v = raw[spec.column];
cells[spec.property.id] = normaliseCellValue(v, spec);
}
return {
id: String(raw.id),
baseId: String(raw.base_id),
cells: cells as any,
position: String(raw.position),
creatorId: raw.creator_id == null ? null : String(raw.creator_id),
lastUpdatedById:
raw.last_updated_by_id == null ? null : String(raw.last_updated_by_id),
workspaceId: String(raw.workspace_id),
createdAt: toDate(raw.created_at),
updatedAt: toDate(raw.updated_at),
deletedAt: raw.deleted_at == null ? null : toDate(raw.deleted_at),
} as BaseRow;
}
function normaliseCellValue(value: unknown, spec: ColumnSpec): unknown {
if (value == null) return null;
switch (spec.ddlType) {
case 'VARCHAR':
return String(value);
case 'DOUBLE':
return typeof value === 'number' ? value : Number(value);
case 'BOOLEAN':
return Boolean(value);
case 'TIMESTAMPTZ': {
if (value instanceof Date) return value.toISOString();
return String(value);
}
case 'JSON': {
if (typeof value === 'string') {
try {
return JSON.parse(value);
return this.redisService.getOrNil();
} catch {
return value;
}
}
return value;
}
default:
return value;
}
}
function toDate(value: unknown): Date {
if (value instanceof Date) return value;
return new Date(String(value));
}
// System property type → system column on base_rows (mirrors the map in
// collection-loader.ts). Kept local to avoid a circular import.
const SYSTEM_PROPERTY_COLUMN_LOOKUP: Record<string, string> = {
[BasePropertyType.CREATED_AT]: 'createdAt',
[BasePropertyType.LAST_EDITED_AT]: 'updatedAt',
[BasePropertyType.LAST_EDITED_BY]: 'lastUpdatedById',
};
// Mirror of collection-loader's `readFromRow`, but keyed off a generic event
// payload (which may be camelCase JSON because it came over EventEmitter /
// Redis rather than straight from Kysely — both shapes round-trip through
// here). The function tolerates both the wire shape and the repo shape.
function readFromRowEvent(
row: Record<string, unknown>,
spec: ColumnSpec,
): unknown {
switch (spec.column) {
case 'id':
return row.id;
case 'base_id':
return row.baseId ?? row.base_id;
case 'workspace_id':
return row.workspaceId ?? row.workspace_id;
case 'creator_id':
return row.creatorId ?? row.creator_id;
case 'position':
return row.position;
case 'created_at':
return row.createdAt ?? row.created_at;
case 'updated_at':
return row.updatedAt ?? row.updated_at;
case 'last_updated_by_id':
return row.lastUpdatedById ?? row.last_updated_by_id;
case 'deleted_at':
return null;
case 'search_text':
return '';
}
const prop = spec.property;
if (!prop) return null;
const sysColumn = SYSTEM_PROPERTY_COLUMN_LOOKUP[prop.type];
if (sysColumn) return row[sysColumn] ?? null;
const cells = (row.cells as Record<string, unknown> | null) ?? {};
return cells[prop.id] ?? null;
}
}
function quoteIdent(name: string): string {
return `"${name.replace(/"/g, '""')}"`;
}
/*
* Convert a DuckDB row object back to the BaseRow JSON shape returned to
* API callers. Kept inline (not exported) because it's a pure derivation
* from the ColumnSpec list.
*/
function shapeBaseRow(
raw: Record<string, unknown>,
specs: ColumnSpec[],
sortBuilds: SortBuild[],
): BaseRow {
const cells: Record<string, unknown> = {};
for (const spec of specs) {
if (!spec.property) continue;
const val = raw[spec.column];
if (val == null) continue;
if (spec.ddlType === 'JSON' && typeof val === 'string') {
try {
cells[spec.property.id] = JSON.parse(val);
} catch {
cells[spec.property.id] = val;
}
} else {
cells[spec.property.id] = val;
}
}
return {
id: raw.id as string,
baseId: raw.base_id as string,
workspaceId: raw.workspace_id as string,
creatorId: raw.creator_id as string,
position: raw.position as string,
createdAt: coerceDate(raw.created_at),
updatedAt: coerceDate(raw.updated_at),
lastUpdatedById: raw.last_updated_by_id as string,
deletedAt: null,
cells,
} as BaseRow;
}
function coerceDate(v: unknown): Date {
if (v instanceof Date) return v;
if (typeof v === 'string') return new Date(v);
return new Date(0);
}
function readFromRowEvent(
row: Record<string, unknown>,
spec: ColumnSpec,
): unknown {
switch (spec.column) {
case 'id': return row.id ?? null;
case 'base_id': return row.baseId ?? row.base_id ?? null;
case 'workspace_id': return row.workspaceId ?? row.workspace_id ?? null;
case 'creator_id': return row.creatorId ?? row.creator_id ?? null;
case 'position': return row.position ?? null;
case 'created_at': return row.createdAt ?? row.created_at ?? null;
case 'updated_at': return row.updatedAt ?? row.updated_at ?? null;
case 'last_updated_by_id': return row.lastUpdatedById ?? row.last_updated_by_id ?? null;
case 'deleted_at': return null;
case 'search_text': return '';
}
const prop = spec.property;
if (!prop) return null;
if (
prop.type === BasePropertyType.CREATED_AT ||
prop.type === BasePropertyType.LAST_EDITED_AT ||
prop.type === BasePropertyType.LAST_EDITED_BY
) {
return null;
}
const cells = (row.cells as Record<string, unknown> | null) ?? {};
return cells[prop.id] ?? null;
}
@@ -1,13 +1,27 @@
import { Injectable, Logger } from '@nestjs/common';
import { DuckDBInstance } from '@duckdb/node-api';
import { BaseRepo } from '@docmost/db/repos/base/base.repo';
import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo';
import { buildColumnSpecs } from './column-types';
import { buildLoaderSql } from './loader-sql';
import { LoadedCollection } from './query-cache.types';
import { PostgresExtensionService } from './postgres-extension.service';
import { baseSchemaName } from './schema-name';
import { DuckDbRuntime } from './duckdb-runtime';
import { QueryCacheConfigProvider } from './query-cache.config';
import { LoadedCollection } from './query-cache.types';
/*
* Loads a base into the shared DuckDB runtime as an attached in-memory
* database (`<schema>.rows`). Steps:
*
* 1. Attach a per-base schema.
* 2. Run `CREATE TABLE <schema>.rows AS SELECT ... FROM postgres_query(...)`
* via the writer connection — Postgres does the JSONB extraction.
* 3. Declare `PRIMARY KEY (id)` on the new table.
* 4. Build ART indexes on every indexable column.
* 5. Count rows and return a LoadedCollection metadata record.
*
* Error path: detach the schema before propagating the error, so we don't
* leak an empty attached DB into the runtime.
*/
@Injectable()
export class CollectionLoader {
private readonly logger = new Logger(CollectionLoader.name);
@@ -15,89 +29,54 @@ export class CollectionLoader {
constructor(
private readonly baseRepo: BaseRepo,
private readonly basePropertyRepo: BasePropertyRepo,
private readonly pgExtension: PostgresExtensionService,
private readonly runtime: DuckDbRuntime,
private readonly config: QueryCacheConfigProvider,
) {}
async load(baseId: string, workspaceId: string): Promise<LoadedCollection> {
if (!this.pgExtension.isReady()) {
if (!this.runtime.isReady()) {
throw new Error(
`Cannot load collection ${baseId}: postgres extension not ready. ` +
'Check PostgresExtensionService bootstrap logs.',
`Cannot load collection ${baseId}: duckdb runtime not ready. ` +
`Check DuckDbRuntime bootstrap logs.`,
);
}
const base = await this.baseRepo.findById(baseId);
if (!base) {
throw new Error(`Base ${baseId} not found`);
}
if (!base) throw new Error(`Base ${baseId} not found`);
const schemaVersion = (base as any).schemaVersion ?? 1;
const properties = await this.basePropertyRepo.findByBaseId(baseId);
const specs = buildColumnSpecs(properties);
const schema = baseSchemaName(baseId);
const { memoryLimit, threads, tempDirectory } = this.config.config;
// Ensure the temp directory exists so DuckDB can spill to it.
// Swallow errors — if creation fails, DuckDB will fail its own sanity
// check and we'll log that instead of crashing here.
try {
const fs = require('node:fs');
fs.mkdirSync(tempDirectory, { recursive: true });
} catch {
/* swallow */
}
const instance = await DuckDBInstance.create(':memory:', {
memory_limit: memoryLimit,
threads: String(threads),
temp_directory: tempDirectory,
});
const connection = await instance.connect();
await this.runtime.attachBase(schema);
try {
await this.pgExtension.configureOnConnection(connection);
const writer = this.runtime.getWriter();
// Disable insertion-order preservation during bulk load — DuckDB's docs
// explicitly recommend this for memory-pressure on large inserts. Our
// loader doesn't depend on the insertion order (we sort via indexes
// or keyset cursors later), so this is free memory savings.
await connection.run('SET preserve_insertion_order = false');
// Bulk load via CREATE TABLE AS SELECT. JSONB extraction happens
// server-side via the base_cell_* helpers; DuckDB streams typed
// columns over COPY BINARY into its vectorized insert path.
const sql = buildLoaderSql(specs, baseId, workspaceId);
const sql = buildLoaderSql(specs, baseId, workspaceId, schema);
if (this.config.config.trace) {
console.log(
'[cache-trace]',
JSON.stringify({
phase: 'loader.sql',
baseId,
schema,
length: sql.length,
sql,
}),
);
}
await connection.run(sql);
await writer.run(sql);
// Release the PG connection held by the ATTACH — we're done with
// Postgres; all subsequent queries run purely against the local table.
await this.pgExtension.detach(connection);
await writer.run(`ALTER TABLE ${schema}.rows ADD PRIMARY KEY (id)`);
// CREATE TABLE AS copies data but not constraints. Re-declare the primary
// key so INSERT OR REPLACE (used by applyChange.upsertRow) has a conflict
// target. This also backs id lookups with an implicit index, speeding up
// per-row upsert/delete.
await connection.run('ALTER TABLE rows ADD PRIMARY KEY (id)');
// Build ART indexes on indexable columns.
for (const spec of specs) {
if (!spec.indexable) continue;
const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_');
const tIdx = this.config.config.trace ? Date.now() : 0;
await connection.run(
`CREATE INDEX ${quoteIdent(`idx_${safe}`)} ON rows (${quoteIdent(spec.column)})`,
await writer.run(
`CREATE INDEX ${schema}_${safe}_idx ON ${schema}.rows (${quoteIdent(spec.column)})`,
);
if (this.config.config.trace) {
console.log(
@@ -105,6 +84,7 @@ export class CollectionLoader {
JSON.stringify({
phase: 'loader.index',
baseId,
schema,
column: spec.column,
ms: Date.now() - tIdx,
}),
@@ -112,67 +92,49 @@ export class CollectionLoader {
}
}
const countResult = await connection.runAndReadAll(
'SELECT count(*) AS c FROM rows',
const countResult = await writer.runAndReadAll(
`SELECT count(*) AS c FROM ${schema}.rows`,
);
const rowCount = Number(
(countResult.getRowObjects()[0] as { c: bigint | number }).c,
);
const memoryResult = await connection.runAndReadAll(
`SELECT
COALESCE(sum(memory_usage_bytes), 0)::BIGINT AS used_bytes,
COALESCE(sum(temporary_storage_bytes), 0)::BIGINT AS spilled_bytes
FROM duckdb_memory()`,
);
const mem = memoryResult.getRowObjects()[0] as {
used_bytes: bigint | number;
spilled_bytes: bigint | number;
};
const heapBytes = Number(mem.used_bytes);
const spilledBytes = Number(mem.spilled_bytes);
const approxBytes = estimateBytes(rowCount, specs.length);
this.logger.debug(
`Loaded ${rowCount} rows for base ${baseId} ` +
`(schemaVersion=${schemaVersion}, heap=${fmtMb(heapBytes)}MB, spilled=${fmtMb(spilledBytes)}MB)`,
`(schemaVersion=${schemaVersion}, schema=${schema}, approxMB=${fmtMb(approxBytes)})`,
);
return {
baseId,
schema,
schemaVersion,
columns: specs,
instance,
connection,
lastAccessedAt: Date.now(),
rowCount,
heapBytes,
spilledBytes,
approxBytes,
};
} catch (err) {
try {
await this.pgExtension.detach(connection);
} catch {
/* swallow */
}
try {
connection.closeSync();
} catch {
/* swallow */
}
try {
instance.closeSync();
} catch {
/* swallow */
}
await this.runtime.detachBase(schema);
} catch { /* swallow */ }
throw err;
}
}
}
function quoteIdent(name: string): string {
return `"${name.replace(/"/g, '""')}"`;
function estimateBytes(rowCount: number, columnCount: number): number {
// Rough heuristic: ~64 bytes per cell (typed value + ART index entry
// overhead). Within 2x of actual for typical schemas; used for
// reporting only, not for eviction decisions.
return rowCount * columnCount * 64;
}
function fmtMb(bytes: number): string {
return (bytes / (1024 * 1024)).toFixed(1);
}
function quoteIdent(name: string): string {
return `"${name.replace(/"/g, '""')}"`;
}
@@ -1,129 +0,0 @@
import { DuckDBInstance } from '@duckdb/node-api';
import { PostgresExtensionService } from './postgres-extension.service';
import { QueryCacheConfigProvider } from './query-cache.config';
const makeConfig = (
overrides: Partial<QueryCacheConfigProvider['config']> = {},
): QueryCacheConfigProvider =>
({
config: {
enabled: true,
minRows: 25_000,
maxCollections: 50,
warmTopN: 50,
memoryLimit: '64MB',
threads: 2,
...overrides,
},
}) as unknown as QueryCacheConfigProvider;
const makeEnv = (
overrides: { dbUrl?: string } = {},
): { getDatabaseURL: () => string } => ({
getDatabaseURL: () => overrides.dbUrl ?? process.env.DATABASE_URL ?? '',
});
describe('PostgresExtensionService', () => {
it('no-ops when the query cache is disabled', async () => {
const svc = new PostgresExtensionService(
makeConfig({ enabled: false }),
makeEnv() as any,
);
await expect(svc.onApplicationBootstrap()).resolves.toBeUndefined();
expect(svc.isReady()).toBe(false);
});
it('installs and loads the postgres extension on bootstrap when enabled', async () => {
const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any);
// First run hits the network (extensions.duckdb.org). Subsequent runs read from cache.
await svc.onApplicationBootstrap();
expect(svc.isReady()).toBe(true);
});
it('configureOnConnection loads the extension and attaches pg in a fresh instance', async () => {
const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any);
await svc.onApplicationBootstrap();
const instance = await DuckDBInstance.create(':memory:');
const conn = await instance.connect();
try {
await svc.configureOnConnection(conn);
// Smoke-test: query any PG system table. DuckDB's postgres scanner
// exposes PG catalog tables under the attached schema's pg_catalog.
const res = await conn.runAndReadAll(
'SELECT count(*) AS c FROM pg.pg_catalog.pg_database',
);
const row = res.getRowObjects()[0] as { c: bigint | number };
expect(Number(row.c)).toBeGreaterThan(0);
await svc.detach(conn);
} finally {
conn.closeSync();
instance.closeSync();
}
});
it('detach is idempotent', async () => {
const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any);
await svc.onApplicationBootstrap();
const instance = await DuckDBInstance.create(':memory:');
const conn = await instance.connect();
try {
await svc.configureOnConnection(conn);
await svc.detach(conn);
await expect(svc.detach(conn)).resolves.toBeUndefined();
} finally {
conn.closeSync();
instance.closeSync();
}
});
it('configureOnConnection throws a clear error when bootstrap never ran', async () => {
const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any);
// Intentionally NOT calling onApplicationBootstrap.
const instance = await DuckDBInstance.create(':memory:');
const conn = await instance.connect();
try {
await expect(svc.configureOnConnection(conn)).rejects.toThrow(/not ready/i);
} finally {
conn.closeSync();
instance.closeSync();
}
});
it('includes the bootstrap failure reason in the not-ready error', async () => {
// Force bootstrap to fail by giving the service a broken DB URL so that
// LOAD postgres still succeeds but something in the bootstrap path throws.
// Simplest reliable failure: monkey-patch the service so its bootstrap
// runs a SQL statement that cannot succeed. We accept a small amount of
// test-only access by subclassing.
class BreakingService extends PostgresExtensionService {
async onApplicationBootstrap(): Promise<void> {
// Call super to keep the gate logic, but sabotage inside by
// running INSTALL on a closed connection via a try-wrapper that
// throws synchronously and is captured by the parent catch.
// Simplest approach: directly set the failure and leave ready=false.
(this as any).ready = false;
(this as any).bootstrapFailure = 'simulated boot failure XYZ';
}
}
const svc = new BreakingService(
makeConfig(),
makeEnv() as any,
);
await svc.onApplicationBootstrap();
const instance = await DuckDBInstance.create(':memory:');
const conn = await instance.connect();
try {
await expect(svc.configureOnConnection(conn)).rejects.toThrow(
/simulated boot failure XYZ/,
);
} finally {
conn.closeSync();
instance.closeSync();
}
});
});
@@ -1,134 +0,0 @@
import {
Injectable,
Logger,
OnApplicationBootstrap,
} from '@nestjs/common';
import { DuckDBInstance, DuckDBConnection } from '@duckdb/node-api';
import { QueryCacheConfigProvider } from './query-cache.config';
import { EnvironmentService } from '../../../integrations/environment/environment.service';
/*
* Owns the lifecycle of DuckDB's `postgres` extension for the query-cache
* module. Responsibilities:
*
* 1. Install the extension once per process at bootstrap. DuckDB caches the
* binary to `$HOME/.duckdb/extensions/...`; subsequent LOADs are offline.
* We use the default DuckDB install path (fetches from
* `extensions.duckdb.org`) — air-gapped bundling is a separate plan.
*
* 2. Configure a fresh DuckDBConnection so a caller can run a single bulk
* load query against Postgres via `CREATE TABLE AS SELECT ... FROM pg.*`.
* We ATTACH `pg` in READ_ONLY mode using the connection URI inline,
* scoped to the DuckDB instance, with no disk state.
*
* 3. DETACH on request so the underlying PG connection is released
* immediately after the load completes. Per-instance PG attachments are
* transient: held only during CREATE TABLE AS, never across queries.
*
* When the master query-cache flag is off, this service is a no-op. No
* instance is created, no network call is made.
*/
@Injectable()
export class PostgresExtensionService implements OnApplicationBootstrap {
private readonly logger = new Logger(PostgresExtensionService.name);
private ready = false;
private bootstrapFailure: string | null = null;
constructor(
private readonly config: QueryCacheConfigProvider,
private readonly env: EnvironmentService,
) {}
async onApplicationBootstrap(): Promise<void> {
if (!this.config.config.enabled) {
this.logger.log('query cache disabled; skipping postgres extension install');
return;
}
const bootstrap = await DuckDBInstance.create(':memory:');
const conn = await bootstrap.connect();
try {
// INSTALL writes to $HOME/.duckdb/extensions/<version>/<platform>/.
// First ever boot: fetches from extensions.duckdb.org. Subsequent boots:
// local-disk no-op.
await conn.run('INSTALL postgres');
await conn.run('LOAD postgres');
this.ready = true;
this.logger.log('postgres extension installed and loaded');
} catch (err) {
const error = err as Error;
this.logger.error(
`Failed to install/load postgres extension: ${error.message}`,
);
if (error.stack) this.logger.error(error.stack);
// Do NOT rethrow. A failed extension install must not crash the whole
// app: the cache service handles this by falling through to Postgres
// when `isReady()` returns false (see `CollectionLoader.load`).
this.ready = false;
this.bootstrapFailure = error.message;
} finally {
conn.closeSync();
bootstrap.closeSync();
}
}
isReady(): boolean {
return this.ready;
}
/*
* Prepares a fresh DuckDBConnection for a bulk-load query against Postgres.
* Must be paired with `detach()` once CREATE TABLE AS completes.
*
* Safe to call on a just-created instance: LOAD reads from the on-disk
* extension cache populated at bootstrap (no network call).
*/
async configureOnConnection(conn: DuckDBConnection): Promise<void> {
if (!this.ready) {
const reason = this.bootstrapFailure
? `: ${this.bootstrapFailure}`
: '';
throw new Error(
`PostgresExtensionService not ready${reason}. Check bootstrap logs.`,
);
}
const dbUrl = this.env.getDatabaseURL();
if (!dbUrl) {
throw new Error('DATABASE_URL is empty; cannot ATTACH from duckdb');
}
await conn.run('LOAD postgres');
// DuckDB 1.5's `postgres` extension secret syntax expects discrete
// HOST/PORT/etc. parameters and rejects the single CONNECTION_STRING
// field. Passing the URI directly as ATTACH's first argument keeps the
// connection details opaque here and still binds the attachment to this
// DuckDB instance only. READ_ONLY guards against the loader accidentally
// mutating Postgres.
await conn.run(
`ATTACH ${escapeSqlString(dbUrl)} AS pg (TYPE POSTGRES, READ_ONLY)`,
);
}
/*
* Releases the PG connection held by this DuckDBConnection's ATTACH.
* Idempotent — safe to call repeatedly, swallows "not attached" errors.
*/
async detach(conn: DuckDBConnection): Promise<void> {
try {
await conn.run('DETACH pg');
} catch (err) {
const msg = (err as Error).message ?? '';
// DuckDB wording: "Failed to detach database with name \"pg\": database
// not found". Also handle older "not attached" / catalog-error shapes.
if (!/not attached|not found|does not exist|catalog|failed to detach/i.test(msg)) {
throw err;
}
}
}
}
function escapeSqlString(s: string): string {
return `'${s.replace(/'/g, "''")}'`;
}
@@ -1,27 +1,27 @@
import { Module } from '@nestjs/common';
import { QueryCacheConfigProvider } from './query-cache.config';
import { DuckDbRuntime } from './duckdb-runtime';
import { BaseQueryCacheService } from './base-query-cache.service';
import { BaseQueryRouter } from './base-query-router';
import { CollectionLoader } from './collection-loader';
import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer';
import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber';
import { PostgresExtensionService } from './postgres-extension.service';
@Module({
providers: [
QueryCacheConfigProvider,
PostgresExtensionService,
DuckDbRuntime,
CollectionLoader,
BaseQueryCacheService,
BaseQueryRouter,
CollectionLoader,
BaseQueryCacheWriteConsumer,
BaseQueryCacheSubscriber,
],
exports: [
BaseQueryCacheService,
BaseQueryRouter,
DuckDbRuntime,
QueryCacheConfigProvider,
PostgresExtensionService,
],
})
export class BaseQueryCacheModule {}
export class QueryCacheModule {}
@@ -1,4 +1,3 @@
import type { DuckDBConnection, DuckDBInstance } from '@duckdb/node-api';
import type { BaseProperty } from '@docmost/db/types/entity.types';
export type DuckDbColumnType =
@@ -9,31 +8,37 @@ export type DuckDbColumnType =
| 'JSON';
export type ColumnSpec = {
// The uuid of the property (user-defined props) or a stable literal
// ('id', 'position', 'created_at', 'updated_at', 'last_updated_by_id',
// 'deleted_at', 'search_text') for system columns.
/*
* The uuid of the property (user-defined props) or a stable literal
* ('id', 'position', 'created_at', 'updated_at', 'last_updated_by_id',
* 'deleted_at', 'search_text') for system columns.
*/
column: string;
ddlType: DuckDbColumnType;
indexable: boolean;
// For user-defined props we keep the source BaseProperty so callers can
// resolve the extraction rule from JSON.
property?: Pick<BaseProperty, 'id' | 'type' | 'typeOptions'>;
};
/*
* A base held in the shared DuckDB instance. Instead of owning a
* `DuckDBInstance` and `DuckDBConnection`, it now just remembers the schema
* name of its attached in-memory database. The runtime owns the actual
* connections; this is pure metadata.
*/
export type LoadedCollection = {
baseId: string;
schema: string; // e.g. "b_019c69a51d847985a7f68ee2871d8669"
schemaVersion: number;
columns: ColumnSpec[];
instance: DuckDBInstance;
connection: DuckDBConnection;
lastAccessedAt: number;
// cached; set by loader, maintained by applyChange
rowCount: number;
// Memory stats captured immediately after load. Static until next
// explicit refresh — see `BaseQueryCacheService.refreshMemoryStats` if you
// need up-to-date figures after many applyChange() mutations.
heapBytes: number;
spilledBytes: number;
/*
* Estimated in-memory footprint, in bytes. DuckDB does not expose
* per-attached-db memory accounting, so this is a rough heuristic
* computed at load time: rowCount × columns.length × ~64 bytes. Used
* for cache-size reporting; not for eviction decisions.
*/
approxBytes: number;
};
export type ChangeEnvelope =