From 38cd94b2d7cd68bb1244d5f53d18a0805b5fc531 Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:40:14 +0100 Subject: [PATCH] refactor(base): single duckdb instance with per-base attached databases --- .../query-cache/base-query-cache.service.ts | 565 ++++++++---------- .../base/query-cache/collection-loader.ts | 136 ++--- .../postgres-extension.service.spec.ts | 129 ---- .../query-cache/postgres-extension.service.ts | 134 ----- .../base/query-cache/query-cache.module.ts | 10 +- .../base/query-cache/query-cache.types.ts | 33 +- 6 files changed, 325 insertions(+), 682 deletions(-) delete mode 100644 apps/server/src/core/base/query-cache/postgres-extension.service.spec.ts delete mode 100644 apps/server/src/core/base/query-cache/postgres-extension.service.ts diff --git a/apps/server/src/core/base/query-cache/base-query-cache.service.ts b/apps/server/src/core/base/query-cache/base-query-cache.service.ts index f2ea353d..b3cdf286 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.service.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.service.ts @@ -27,6 +27,7 @@ import { import { QueryCacheConfigProvider } from './query-cache.config'; import { CollectionLoader } from './collection-loader'; import { buildDuckDbListQuery } from './duckdb-query-builder'; +import { DuckDbRuntime } from './duckdb-runtime'; import { BasePropertyType } from '../base.schemas'; import { ChangeEnvelope, @@ -55,6 +56,7 @@ export class BaseQueryCacheService private readonly configProvider: QueryCacheConfigProvider, private readonly baseRepo: BaseRepo, private readonly collectionLoader: CollectionLoader, + private readonly runtime: DuckDbRuntime, @Optional() private readonly redisService: RedisService | null = null, @Optional() private readonly env: EnvironmentService | null = null, ) {} @@ -62,8 +64,14 @@ export class BaseQueryCacheService async onApplicationBootstrap(): Promise { const { enabled, warmTopN } = this.configProvider.config; if (!enabled) return; + if (!this.runtime.isReady()) { + this.logger.warn('runtime not ready; skipping warm-up'); + return; + } + const redis = this.tryGetRedisClient(); if (!redis) return; + try { const ids = await redis.zrevrange( 'base-query-cache:recent', @@ -89,41 +97,10 @@ export class BaseQueryCacheService } } - private tryGetRedisClient(): Redis | null { - if (!this.redisService) return null; - try { - return this.redisService.getOrNil(); - } catch { - return null; - } - } - - private recordAccess(baseId: string): void { - if (!this.configProvider.config.enabled) return; - const redis = this.tryGetRedisClient(); - if (!redis) return; - const nowMs = Date.now(); - const maxKeep = this.configProvider.config.maxCollections * 10; - void (async () => { - try { - await redis.zadd('base-query-cache:recent', nowMs, baseId); - await redis.zremrangebyrank( - 'base-query-cache:recent', - 0, - -(maxKeep + 1), - ); - } catch (err) { - this.logger.debug( - `recordAccess failed for ${baseId}: ${(err as Error).message}`, - ); - } - })(); - } - async onModuleDestroy(): Promise { - for (const [, collection] of this.collections) { - this.closeCollection(collection); - } + // The runtime owns the instance/connection lifecycle; we just clear + // our metadata. DETACH is a no-op during shutdown because the instance + // is closing anyway. this.collections.clear(); } @@ -133,6 +110,7 @@ export class BaseQueryCacheService opts: CacheListOpts, ): Promise> { const debug = this.env?.getBaseQueryCacheDebug() ?? false; + const trace = this.env?.getBaseQueryCacheTrace?.() ?? false; const tStart = debug ? Date.now() : 0; const tEnsure = debug ? Date.now() : 0; @@ -143,9 +121,7 @@ export class BaseQueryCacheService opts.sorts && opts.sorts.length > 0 ? buildSorts(opts.sorts, opts.schema) : []; - const cursor = makeCursor(sortBuilds, CURSOR_TAIL_KEYS); - const sortFieldKeys = sortBuilds.map((s) => s.key); const allFieldKeys = [...sortFieldKeys, 'position', 'id']; @@ -164,42 +140,45 @@ export class BaseQueryCacheService limit: opts.pagination.limit, afterKeys: afterKeys as any, }, + schema: collection.schema, }); - if (this.env?.getBaseQueryCacheTrace?.() ?? false) { + if (trace) { console.log( '[cache-trace]', JSON.stringify({ phase: 'query.sql', baseId: baseId.slice(0, 8), + schema: collection.schema, sql, params, }), ); } - const prepared = await collection.connection.prepare(sql); - for (let i = 0; i < params.length; i++) { - const p = params[i]; - const oneBased = i + 1; - if (p === null || p === undefined) { - prepared.bindNull(oneBased); - } else if (typeof p === 'string') { - prepared.bindVarchar(oneBased, p); - } else if (typeof p === 'number') { - prepared.bindDouble(oneBased, p); - } else if (typeof p === 'boolean') { - prepared.bindBoolean(oneBased, p); - } else if (p instanceof Date) { - prepared.bindVarchar(oneBased, p.toISOString()); - } else { - prepared.bindVarchar(oneBased, JSON.stringify(p)); - } - } - const tExec = debug ? Date.now() : 0; - const reader = await prepared.runAndReadAll(); - const duckRows = reader.getRowObjectsJS(); + const duckRows = await this.runtime.withReader(async (conn) => { + const prepared = await conn.prepare(sql); + for (let i = 0; i < params.length; i++) { + const p = params[i]; + const oneBased = i + 1; + if (p === null || p === undefined) { + prepared.bindNull(oneBased); + } else if (typeof p === 'string') { + prepared.bindVarchar(oneBased, p); + } else if (typeof p === 'number') { + prepared.bindDouble(oneBased, p); + } else if (typeof p === 'boolean') { + prepared.bindBoolean(oneBased, p); + } else if (p instanceof Date) { + prepared.bindVarchar(oneBased, p.toISOString()); + } else { + prepared.bindVarchar(oneBased, JSON.stringify(p)); + } + } + const reader = await prepared.runAndReadAll(); + return reader.getRowObjectsJS(); + }); const execMs = debug ? Date.now() - tExec : 0; const hasNextPage = duckRows.length > opts.pagination.limit; @@ -231,12 +210,9 @@ export class BaseQueryCacheService const endRow = duckRows[duckRows.length - 1]; const startRow = duckRows[0]; - const encodeFromRow = (raw: Record): string => { const entries: Array<[string, unknown]> = []; - for (const sb of sortBuilds) { - entries.push([sb.key, raw[sb.key]]); - } + for (const sb of sortBuilds) entries.push([sb.key, raw[sb.key]]); entries.push(['position', raw.position]); entries.push(['id', raw.id]); return cursor.encodeCursor(entries); @@ -276,13 +252,10 @@ export class BaseQueryCacheService async invalidate(baseId: string): Promise { const collection = this.collections.get(baseId); if (!collection) return; - this.closeCollection(collection); + await this.runtime.detachBase(collection.schema); this.collections.delete(baseId); } - // Test-only introspection of the resident cache. Used by the LRU eviction - // integration spec to assert which collections are currently loaded without - // reaching into the private `collections` map. isResident(baseId: string): boolean { return this.collections.has(baseId); } @@ -291,48 +264,38 @@ export class BaseQueryCacheService return this.collections.size; } - // Production-facing fast path for the router: returns the resident - // collection without triggering a load. Used to avoid a per-request - // Postgres COUNT when the cached rowCount already answers the question. peek(baseId: string): LoadedCollection | undefined { return this.collections.get(baseId); } - // Returns the memory footprint of every currently resident collection. residencySnapshot(): Array<{ baseId: string; + schema: string; rows: number; - heapMb: number; - spilledMb: number; + approxMb: number; }> { const out: Array<{ baseId: string; + schema: string; rows: number; - heapMb: number; - spilledMb: number; + approxMb: number; }> = []; for (const [baseId, c] of this.collections) { out.push({ baseId, + schema: c.schema, rows: c.rowCount, - heapMb: +(c.heapBytes / (1024 * 1024)).toFixed(1), - spilledMb: +(c.spilledBytes / (1024 * 1024)).toFixed(1), + approxMb: +(c.approxBytes / (1024 * 1024)).toFixed(1), }); } return out; } - /* - * Apply a change envelope received from Redis pub/sub to the local - * collection (if any). Rows that target bases not resident on this node - * are ignored — the next `list` call will load them fresh from Postgres. - * If any patch step throws (e.g. schema drift between this node and the - * publisher) we eagerly invalidate so the next `list` rebuilds cleanly - * rather than serving partial state. - */ async applyChange(env: ChangeEnvelope): Promise { + const trace = this.env?.getBaseQueryCacheTrace?.() ?? false; const collection = this.collections.get(env.baseId); - if (this.env?.getBaseQueryCacheTrace?.() ?? false) { + + if (trace) { console.log( '[cache-trace]', JSON.stringify({ @@ -343,6 +306,7 @@ export class BaseQueryCacheService }), ); } + if (!collection) return; try { @@ -378,16 +342,95 @@ export class BaseQueryCacheService } } - private async refreshRowCount(collection: LoadedCollection): Promise { - try { - const res = await collection.connection.runAndReadAll( - 'SELECT count(*) AS c FROM rows', + private async ensureLoaded( + baseId: string, + workspaceId: string, + ): Promise { + const debug = this.env?.getBaseQueryCacheDebug() ?? false; + const existing = this.collections.get(baseId); + + const tFind = debug ? Date.now() : 0; + const base = await this.baseRepo.findById(baseId); + const findMs = debug ? Date.now() - tFind : 0; + if (!base) throw new Error(`Base ${baseId} not found`); + const freshVersion = (base as any).schemaVersion ?? 1; + + if (existing && existing.schemaVersion === freshVersion) { + existing.lastAccessedAt = Date.now(); + this.recordAccess(baseId); + if (debug) { + console.log( + '[cache-perf]', + JSON.stringify({ + phase: 'ensureLoaded.hit', + baseId: baseId.slice(0, 8), + findMs, + }), + ); + } + return existing; + } + + if (existing) { + await this.runtime.detachBase(existing.schema); + this.collections.delete(baseId); + } + + const inFlight = this.inFlightLoads.get(baseId); + if (inFlight) { + const loaded = await inFlight; + this.recordAccess(baseId); + return loaded; + } + + const tLoad = debug ? Date.now() : 0; + const promise = (async () => { + try { + const { maxCollections } = this.configProvider.config; + if (this.collections.size >= maxCollections) { + await this.evictLru(); + } + const loaded = await this.collectionLoader.load(baseId, workspaceId); + this.collections.set(baseId, loaded); + return loaded; + } finally { + this.inFlightLoads.delete(baseId); + } + })(); + this.inFlightLoads.set(baseId, promise); + const loaded = await promise; + const loadMs = debug ? Date.now() - tLoad : 0; + this.recordAccess(baseId); + if (debug) { + console.log( + '[cache-perf]', + JSON.stringify({ + phase: 'ensureLoaded.miss', + baseId: baseId.slice(0, 8), + findMs, + loadMs, + rows: loaded.rowCount, + approxMb: +(loaded.approxBytes / (1024 * 1024)).toFixed(1), + }), ); - const row = res.getRowObjects()[0] as { c: bigint | number }; - collection.rowCount = Number(row.c); - } catch { - // swallow — stale rowCount drifts at most by the size of the burst; the - // next reload-from-Postgres or pubsub event corrects it. + } + return loaded; + } + + private async evictLru(): Promise { + let oldestKey: string | null = null; + let oldestTime = Number.POSITIVE_INFINITY; + for (const [key, col] of this.collections) { + if (col.lastAccessedAt < oldestTime) { + oldestTime = col.lastAccessedAt; + oldestKey = key; + } + } + if (oldestKey) { + const col = this.collections.get(oldestKey)!; + await this.runtime.detachBase(col.schema); + this.collections.delete(oldestKey); + this.logger.debug(`Evicted LRU collection ${oldestKey}`); } } @@ -398,9 +441,10 @@ export class BaseQueryCacheService const specs = collection.columns; const columnList = specs.map((s) => quoteIdent(s.column)).join(', '); const placeholders = specs.map(() => '?').join(', '); - const sql = `INSERT OR REPLACE INTO rows (${columnList}) VALUES (${placeholders})`; + const sql = `INSERT OR REPLACE INTO ${collection.schema}.rows (${columnList}) VALUES (${placeholders})`; - const prepared = await collection.connection.prepare(sql); + const writer = this.runtime.getWriter(); + const prepared = await writer.prepare(sql); for (let i = 0; i < specs.length; i++) { const spec = specs[i]; const oneBased = i + 1; @@ -440,8 +484,9 @@ export class BaseQueryCacheService collection: LoadedCollection, rowId: string, ): Promise { - const prepared = await collection.connection.prepare( - 'DELETE FROM rows WHERE id = ?', + const writer = this.runtime.getWriter(); + const prepared = await writer.prepare( + `DELETE FROM ${collection.schema}.rows WHERE id = ?`, ); prepared.bindVarchar(1, rowId); await prepared.run(); @@ -452,239 +497,133 @@ export class BaseQueryCacheService rowId: string, position: string, ): Promise { - const prepared = await collection.connection.prepare( - 'UPDATE rows SET position = ? WHERE id = ?', + const writer = this.runtime.getWriter(); + const prepared = await writer.prepare( + `UPDATE ${collection.schema}.rows SET position = ? WHERE id = ?`, ); prepared.bindVarchar(1, position); prepared.bindVarchar(2, rowId); await prepared.run(); } - private async ensureLoaded( - baseId: string, - workspaceId: string, - ): Promise { - const debug = this.env?.getBaseQueryCacheDebug() ?? false; - // TODO(task-7): remove per-request findById once pub/sub invalidation - // keeps collections in sync with schema bumps. - const existing = this.collections.get(baseId); - - const tFind = debug ? Date.now() : 0; - const base = await this.baseRepo.findById(baseId); - const findMs = debug ? Date.now() - tFind : 0; - if (!base) { - throw new Error(`Base ${baseId} not found`); + private async refreshRowCount(collection: LoadedCollection): Promise { + try { + const res = await this.runtime.getWriter().runAndReadAll( + `SELECT count(*) AS c FROM ${collection.schema}.rows`, + ); + const row = res.getRowObjects()[0] as { c: bigint | number }; + collection.rowCount = Number(row.c); + } catch { + // stale rowCount self-corrects on next reload } - const freshVersion = (base as any).schemaVersion ?? 1; + } - if (existing && existing.schemaVersion === freshVersion) { - existing.lastAccessedAt = Date.now(); - this.recordAccess(baseId); - if (debug) { - console.log( - '[cache-perf]', - JSON.stringify({ - phase: 'ensureLoaded.hit', - baseId: baseId.slice(0, 8), - findMs, - }), + private recordAccess(baseId: string): void { + if (!this.configProvider.config.enabled) return; + const redis = this.tryGetRedisClient(); + if (!redis) return; + const nowMs = Date.now(); + const maxKeep = this.configProvider.config.maxCollections * 10; + void (async () => { + try { + await redis.zadd('base-query-cache:recent', nowMs, baseId); + await redis.zremrangebyrank( + 'base-query-cache:recent', + 0, + -(maxKeep + 1), + ); + } catch (err) { + this.logger.debug( + `recordAccess failed for ${baseId}: ${(err as Error).message}`, ); } - return existing; - } - - if (existing) { - this.closeCollection(existing); - this.collections.delete(baseId); - } - - const inFlight = this.inFlightLoads.get(baseId); - if (inFlight) { - const loaded = await inFlight; - this.recordAccess(baseId); - return loaded; - } - - const tLoad = debug ? Date.now() : 0; - const promise = (async () => { - try { - const { maxCollections } = this.configProvider.config; - if (this.collections.size >= maxCollections) { - this.evictLru(); - } - const loaded = await this.collectionLoader.load(baseId, workspaceId); - this.collections.set(baseId, loaded); - return loaded; - } finally { - this.inFlightLoads.delete(baseId); - } })(); - this.inFlightLoads.set(baseId, promise); - const loaded = await promise; - const loadMs = debug ? Date.now() - tLoad : 0; - this.recordAccess(baseId); - if (debug) { - console.log( - '[cache-perf]', - JSON.stringify({ - phase: 'ensureLoaded.miss', - baseId: baseId.slice(0, 8), - findMs, - loadMs, - rows: loaded.rowCount, - heapMb: +(loaded.heapBytes / (1024 * 1024)).toFixed(1), - spilledMb: +(loaded.spilledBytes / (1024 * 1024)).toFixed(1), - }), - ); - } - return loaded; } - private evictLru(): void { - let oldestKey: string | null = null; - let oldestTime = Number.POSITIVE_INFINITY; - for (const [key, col] of this.collections) { - if (col.lastAccessedAt < oldestTime) { - oldestTime = col.lastAccessedAt; - oldestKey = key; - } - } - if (oldestKey) { - const col = this.collections.get(oldestKey)!; - this.closeCollection(col); - this.collections.delete(oldestKey); - this.logger.debug(`Evicted LRU collection ${oldestKey}`); - } - } - - private closeCollection(collection: LoadedCollection): void { + private tryGetRedisClient(): Redis | null { + if (!this.redisService) return null; try { - collection.connection.closeSync(); - } catch (err) { - this.logger.warn(`Failed to close connection: ${(err as Error).message}`); - } - try { - collection.instance.closeSync(); - } catch (err) { - this.logger.warn(`Failed to close instance: ${(err as Error).message}`); - } - } -} - -// Convert a DuckDB row object back into the BaseRow JSON shape. The builder -// projects one column per user property; typed columns (DOUBLE, BOOLEAN, -// TIMESTAMPTZ) round-trip as JS primitives / Date objects. We reconstruct -// `cells` directly from the per-property columns so the JSON payload matches -// what Postgres returns. -function shapeBaseRow( - raw: Record, - specs: ColumnSpec[], - _sortBuilds: SortBuild[], -): BaseRow { - const cells: Record = {}; - for (const spec of specs) { - if (!spec.property) continue; // system columns handled below - const v = raw[spec.column]; - cells[spec.property.id] = normaliseCellValue(v, spec); - } - - return { - id: String(raw.id), - baseId: String(raw.base_id), - cells: cells as any, - position: String(raw.position), - creatorId: raw.creator_id == null ? null : String(raw.creator_id), - lastUpdatedById: - raw.last_updated_by_id == null ? null : String(raw.last_updated_by_id), - workspaceId: String(raw.workspace_id), - createdAt: toDate(raw.created_at), - updatedAt: toDate(raw.updated_at), - deletedAt: raw.deleted_at == null ? null : toDate(raw.deleted_at), - } as BaseRow; -} - -function normaliseCellValue(value: unknown, spec: ColumnSpec): unknown { - if (value == null) return null; - switch (spec.ddlType) { - case 'VARCHAR': - return String(value); - case 'DOUBLE': - return typeof value === 'number' ? value : Number(value); - case 'BOOLEAN': - return Boolean(value); - case 'TIMESTAMPTZ': { - if (value instanceof Date) return value.toISOString(); - return String(value); - } - case 'JSON': { - if (typeof value === 'string') { - try { - return JSON.parse(value); - } catch { - return value; - } - } - return value; - } - default: - return value; - } -} - -function toDate(value: unknown): Date { - if (value instanceof Date) return value; - return new Date(String(value)); -} - -// System property type → system column on base_rows (mirrors the map in -// collection-loader.ts). Kept local to avoid a circular import. -const SYSTEM_PROPERTY_COLUMN_LOOKUP: Record = { - [BasePropertyType.CREATED_AT]: 'createdAt', - [BasePropertyType.LAST_EDITED_AT]: 'updatedAt', - [BasePropertyType.LAST_EDITED_BY]: 'lastUpdatedById', -}; - -// Mirror of collection-loader's `readFromRow`, but keyed off a generic event -// payload (which may be camelCase JSON because it came over EventEmitter / -// Redis rather than straight from Kysely — both shapes round-trip through -// here). The function tolerates both the wire shape and the repo shape. -function readFromRowEvent( - row: Record, - spec: ColumnSpec, -): unknown { - switch (spec.column) { - case 'id': - return row.id; - case 'base_id': - return row.baseId ?? row.base_id; - case 'workspace_id': - return row.workspaceId ?? row.workspace_id; - case 'creator_id': - return row.creatorId ?? row.creator_id; - case 'position': - return row.position; - case 'created_at': - return row.createdAt ?? row.created_at; - case 'updated_at': - return row.updatedAt ?? row.updated_at; - case 'last_updated_by_id': - return row.lastUpdatedById ?? row.last_updated_by_id; - case 'deleted_at': + return this.redisService.getOrNil(); + } catch { return null; - case 'search_text': - return ''; + } } - - const prop = spec.property; - if (!prop) return null; - - const sysColumn = SYSTEM_PROPERTY_COLUMN_LOOKUP[prop.type]; - if (sysColumn) return row[sysColumn] ?? null; - - const cells = (row.cells as Record | null) ?? {}; - return cells[prop.id] ?? null; } function quoteIdent(name: string): string { return `"${name.replace(/"/g, '""')}"`; } + +/* + * Convert a DuckDB row object back to the BaseRow JSON shape returned to + * API callers. Kept inline (not exported) because it's a pure derivation + * from the ColumnSpec list. + */ +function shapeBaseRow( + raw: Record, + specs: ColumnSpec[], + sortBuilds: SortBuild[], +): BaseRow { + const cells: Record = {}; + for (const spec of specs) { + if (!spec.property) continue; + const val = raw[spec.column]; + if (val == null) continue; + if (spec.ddlType === 'JSON' && typeof val === 'string') { + try { + cells[spec.property.id] = JSON.parse(val); + } catch { + cells[spec.property.id] = val; + } + } else { + cells[spec.property.id] = val; + } + } + return { + id: raw.id as string, + baseId: raw.base_id as string, + workspaceId: raw.workspace_id as string, + creatorId: raw.creator_id as string, + position: raw.position as string, + createdAt: coerceDate(raw.created_at), + updatedAt: coerceDate(raw.updated_at), + lastUpdatedById: raw.last_updated_by_id as string, + deletedAt: null, + cells, + } as BaseRow; +} + +function coerceDate(v: unknown): Date { + if (v instanceof Date) return v; + if (typeof v === 'string') return new Date(v); + return new Date(0); +} + +function readFromRowEvent( + row: Record, + spec: ColumnSpec, +): unknown { + switch (spec.column) { + case 'id': return row.id ?? null; + case 'base_id': return row.baseId ?? row.base_id ?? null; + case 'workspace_id': return row.workspaceId ?? row.workspace_id ?? null; + case 'creator_id': return row.creatorId ?? row.creator_id ?? null; + case 'position': return row.position ?? null; + case 'created_at': return row.createdAt ?? row.created_at ?? null; + case 'updated_at': return row.updatedAt ?? row.updated_at ?? null; + case 'last_updated_by_id': return row.lastUpdatedById ?? row.last_updated_by_id ?? null; + case 'deleted_at': return null; + case 'search_text': return ''; + } + const prop = spec.property; + if (!prop) return null; + if ( + prop.type === BasePropertyType.CREATED_AT || + prop.type === BasePropertyType.LAST_EDITED_AT || + prop.type === BasePropertyType.LAST_EDITED_BY + ) { + return null; + } + const cells = (row.cells as Record | null) ?? {}; + return cells[prop.id] ?? null; +} diff --git a/apps/server/src/core/base/query-cache/collection-loader.ts b/apps/server/src/core/base/query-cache/collection-loader.ts index c4abbe23..c2d91ce2 100644 --- a/apps/server/src/core/base/query-cache/collection-loader.ts +++ b/apps/server/src/core/base/query-cache/collection-loader.ts @@ -1,13 +1,27 @@ import { Injectable, Logger } from '@nestjs/common'; -import { DuckDBInstance } from '@duckdb/node-api'; import { BaseRepo } from '@docmost/db/repos/base/base.repo'; import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo'; import { buildColumnSpecs } from './column-types'; import { buildLoaderSql } from './loader-sql'; -import { LoadedCollection } from './query-cache.types'; -import { PostgresExtensionService } from './postgres-extension.service'; +import { baseSchemaName } from './schema-name'; +import { DuckDbRuntime } from './duckdb-runtime'; import { QueryCacheConfigProvider } from './query-cache.config'; +import { LoadedCollection } from './query-cache.types'; +/* + * Loads a base into the shared DuckDB runtime as an attached in-memory + * database (`.rows`). Steps: + * + * 1. Attach a per-base schema. + * 2. Run `CREATE TABLE .rows AS SELECT ... FROM postgres_query(...)` + * via the writer connection — Postgres does the JSONB extraction. + * 3. Declare `PRIMARY KEY (id)` on the new table. + * 4. Build ART indexes on every indexable column. + * 5. Count rows and return a LoadedCollection metadata record. + * + * Error path: detach the schema before propagating the error, so we don't + * leak an empty attached DB into the runtime. + */ @Injectable() export class CollectionLoader { private readonly logger = new Logger(CollectionLoader.name); @@ -15,89 +29,54 @@ export class CollectionLoader { constructor( private readonly baseRepo: BaseRepo, private readonly basePropertyRepo: BasePropertyRepo, - private readonly pgExtension: PostgresExtensionService, + private readonly runtime: DuckDbRuntime, private readonly config: QueryCacheConfigProvider, ) {} async load(baseId: string, workspaceId: string): Promise { - if (!this.pgExtension.isReady()) { + if (!this.runtime.isReady()) { throw new Error( - `Cannot load collection ${baseId}: postgres extension not ready. ` + - 'Check PostgresExtensionService bootstrap logs.', + `Cannot load collection ${baseId}: duckdb runtime not ready. ` + + `Check DuckDbRuntime bootstrap logs.`, ); } const base = await this.baseRepo.findById(baseId); - if (!base) { - throw new Error(`Base ${baseId} not found`); - } + if (!base) throw new Error(`Base ${baseId} not found`); const schemaVersion = (base as any).schemaVersion ?? 1; const properties = await this.basePropertyRepo.findByBaseId(baseId); const specs = buildColumnSpecs(properties); + const schema = baseSchemaName(baseId); - const { memoryLimit, threads, tempDirectory } = this.config.config; - - // Ensure the temp directory exists so DuckDB can spill to it. - // Swallow errors — if creation fails, DuckDB will fail its own sanity - // check and we'll log that instead of crashing here. - try { - const fs = require('node:fs'); - fs.mkdirSync(tempDirectory, { recursive: true }); - } catch { - /* swallow */ - } - - const instance = await DuckDBInstance.create(':memory:', { - memory_limit: memoryLimit, - threads: String(threads), - temp_directory: tempDirectory, - }); - const connection = await instance.connect(); + await this.runtime.attachBase(schema); try { - await this.pgExtension.configureOnConnection(connection); + const writer = this.runtime.getWriter(); - // Disable insertion-order preservation during bulk load — DuckDB's docs - // explicitly recommend this for memory-pressure on large inserts. Our - // loader doesn't depend on the insertion order (we sort via indexes - // or keyset cursors later), so this is free memory savings. - await connection.run('SET preserve_insertion_order = false'); - - // Bulk load via CREATE TABLE AS SELECT. JSONB extraction happens - // server-side via the base_cell_* helpers; DuckDB streams typed - // columns over COPY BINARY into its vectorized insert path. - const sql = buildLoaderSql(specs, baseId, workspaceId); + const sql = buildLoaderSql(specs, baseId, workspaceId, schema); if (this.config.config.trace) { console.log( '[cache-trace]', JSON.stringify({ phase: 'loader.sql', baseId, + schema, length: sql.length, sql, }), ); } - await connection.run(sql); + await writer.run(sql); - // Release the PG connection held by the ATTACH — we're done with - // Postgres; all subsequent queries run purely against the local table. - await this.pgExtension.detach(connection); + await writer.run(`ALTER TABLE ${schema}.rows ADD PRIMARY KEY (id)`); - // CREATE TABLE AS copies data but not constraints. Re-declare the primary - // key so INSERT OR REPLACE (used by applyChange.upsertRow) has a conflict - // target. This also backs id lookups with an implicit index, speeding up - // per-row upsert/delete. - await connection.run('ALTER TABLE rows ADD PRIMARY KEY (id)'); - - // Build ART indexes on indexable columns. for (const spec of specs) { if (!spec.indexable) continue; const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_'); const tIdx = this.config.config.trace ? Date.now() : 0; - await connection.run( - `CREATE INDEX ${quoteIdent(`idx_${safe}`)} ON rows (${quoteIdent(spec.column)})`, + await writer.run( + `CREATE INDEX ${schema}_${safe}_idx ON ${schema}.rows (${quoteIdent(spec.column)})`, ); if (this.config.config.trace) { console.log( @@ -105,6 +84,7 @@ export class CollectionLoader { JSON.stringify({ phase: 'loader.index', baseId, + schema, column: spec.column, ms: Date.now() - tIdx, }), @@ -112,67 +92,49 @@ export class CollectionLoader { } } - const countResult = await connection.runAndReadAll( - 'SELECT count(*) AS c FROM rows', + const countResult = await writer.runAndReadAll( + `SELECT count(*) AS c FROM ${schema}.rows`, ); const rowCount = Number( (countResult.getRowObjects()[0] as { c: bigint | number }).c, ); - const memoryResult = await connection.runAndReadAll( - `SELECT - COALESCE(sum(memory_usage_bytes), 0)::BIGINT AS used_bytes, - COALESCE(sum(temporary_storage_bytes), 0)::BIGINT AS spilled_bytes - FROM duckdb_memory()`, - ); - const mem = memoryResult.getRowObjects()[0] as { - used_bytes: bigint | number; - spilled_bytes: bigint | number; - }; - const heapBytes = Number(mem.used_bytes); - const spilledBytes = Number(mem.spilled_bytes); + const approxBytes = estimateBytes(rowCount, specs.length); this.logger.debug( `Loaded ${rowCount} rows for base ${baseId} ` + - `(schemaVersion=${schemaVersion}, heap=${fmtMb(heapBytes)}MB, spilled=${fmtMb(spilledBytes)}MB)`, + `(schemaVersion=${schemaVersion}, schema=${schema}, approxMB=${fmtMb(approxBytes)})`, ); return { baseId, + schema, schemaVersion, columns: specs, - instance, - connection, lastAccessedAt: Date.now(), rowCount, - heapBytes, - spilledBytes, + approxBytes, }; } catch (err) { try { - await this.pgExtension.detach(connection); - } catch { - /* swallow */ - } - try { - connection.closeSync(); - } catch { - /* swallow */ - } - try { - instance.closeSync(); - } catch { - /* swallow */ - } + await this.runtime.detachBase(schema); + } catch { /* swallow */ } throw err; } } } -function quoteIdent(name: string): string { - return `"${name.replace(/"/g, '""')}"`; +function estimateBytes(rowCount: number, columnCount: number): number { + // Rough heuristic: ~64 bytes per cell (typed value + ART index entry + // overhead). Within 2x of actual for typical schemas; used for + // reporting only, not for eviction decisions. + return rowCount * columnCount * 64; } function fmtMb(bytes: number): string { return (bytes / (1024 * 1024)).toFixed(1); } + +function quoteIdent(name: string): string { + return `"${name.replace(/"/g, '""')}"`; +} diff --git a/apps/server/src/core/base/query-cache/postgres-extension.service.spec.ts b/apps/server/src/core/base/query-cache/postgres-extension.service.spec.ts deleted file mode 100644 index e94037bd..00000000 --- a/apps/server/src/core/base/query-cache/postgres-extension.service.spec.ts +++ /dev/null @@ -1,129 +0,0 @@ -import { DuckDBInstance } from '@duckdb/node-api'; -import { PostgresExtensionService } from './postgres-extension.service'; -import { QueryCacheConfigProvider } from './query-cache.config'; - -const makeConfig = ( - overrides: Partial = {}, -): QueryCacheConfigProvider => - ({ - config: { - enabled: true, - minRows: 25_000, - maxCollections: 50, - warmTopN: 50, - memoryLimit: '64MB', - threads: 2, - ...overrides, - }, - }) as unknown as QueryCacheConfigProvider; - -const makeEnv = ( - overrides: { dbUrl?: string } = {}, -): { getDatabaseURL: () => string } => ({ - getDatabaseURL: () => overrides.dbUrl ?? process.env.DATABASE_URL ?? '', -}); - -describe('PostgresExtensionService', () => { - it('no-ops when the query cache is disabled', async () => { - const svc = new PostgresExtensionService( - makeConfig({ enabled: false }), - makeEnv() as any, - ); - await expect(svc.onApplicationBootstrap()).resolves.toBeUndefined(); - expect(svc.isReady()).toBe(false); - }); - - it('installs and loads the postgres extension on bootstrap when enabled', async () => { - const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any); - // First run hits the network (extensions.duckdb.org). Subsequent runs read from cache. - await svc.onApplicationBootstrap(); - expect(svc.isReady()).toBe(true); - }); - - it('configureOnConnection loads the extension and attaches pg in a fresh instance', async () => { - const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any); - await svc.onApplicationBootstrap(); - - const instance = await DuckDBInstance.create(':memory:'); - const conn = await instance.connect(); - try { - await svc.configureOnConnection(conn); - // Smoke-test: query any PG system table. DuckDB's postgres scanner - // exposes PG catalog tables under the attached schema's pg_catalog. - const res = await conn.runAndReadAll( - 'SELECT count(*) AS c FROM pg.pg_catalog.pg_database', - ); - const row = res.getRowObjects()[0] as { c: bigint | number }; - expect(Number(row.c)).toBeGreaterThan(0); - await svc.detach(conn); - } finally { - conn.closeSync(); - instance.closeSync(); - } - }); - - it('detach is idempotent', async () => { - const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any); - await svc.onApplicationBootstrap(); - - const instance = await DuckDBInstance.create(':memory:'); - const conn = await instance.connect(); - try { - await svc.configureOnConnection(conn); - await svc.detach(conn); - await expect(svc.detach(conn)).resolves.toBeUndefined(); - } finally { - conn.closeSync(); - instance.closeSync(); - } - }); - - it('configureOnConnection throws a clear error when bootstrap never ran', async () => { - const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any); - // Intentionally NOT calling onApplicationBootstrap. - const instance = await DuckDBInstance.create(':memory:'); - const conn = await instance.connect(); - try { - await expect(svc.configureOnConnection(conn)).rejects.toThrow(/not ready/i); - } finally { - conn.closeSync(); - instance.closeSync(); - } - }); - - it('includes the bootstrap failure reason in the not-ready error', async () => { - // Force bootstrap to fail by giving the service a broken DB URL so that - // LOAD postgres still succeeds but something in the bootstrap path throws. - // Simplest reliable failure: monkey-patch the service so its bootstrap - // runs a SQL statement that cannot succeed. We accept a small amount of - // test-only access by subclassing. - - class BreakingService extends PostgresExtensionService { - async onApplicationBootstrap(): Promise { - // Call super to keep the gate logic, but sabotage inside by - // running INSTALL on a closed connection via a try-wrapper that - // throws synchronously and is captured by the parent catch. - // Simplest approach: directly set the failure and leave ready=false. - (this as any).ready = false; - (this as any).bootstrapFailure = 'simulated boot failure XYZ'; - } - } - - const svc = new BreakingService( - makeConfig(), - makeEnv() as any, - ); - await svc.onApplicationBootstrap(); - - const instance = await DuckDBInstance.create(':memory:'); - const conn = await instance.connect(); - try { - await expect(svc.configureOnConnection(conn)).rejects.toThrow( - /simulated boot failure XYZ/, - ); - } finally { - conn.closeSync(); - instance.closeSync(); - } - }); -}); diff --git a/apps/server/src/core/base/query-cache/postgres-extension.service.ts b/apps/server/src/core/base/query-cache/postgres-extension.service.ts deleted file mode 100644 index 51ed0af3..00000000 --- a/apps/server/src/core/base/query-cache/postgres-extension.service.ts +++ /dev/null @@ -1,134 +0,0 @@ -import { - Injectable, - Logger, - OnApplicationBootstrap, -} from '@nestjs/common'; -import { DuckDBInstance, DuckDBConnection } from '@duckdb/node-api'; -import { QueryCacheConfigProvider } from './query-cache.config'; -import { EnvironmentService } from '../../../integrations/environment/environment.service'; - -/* - * Owns the lifecycle of DuckDB's `postgres` extension for the query-cache - * module. Responsibilities: - * - * 1. Install the extension once per process at bootstrap. DuckDB caches the - * binary to `$HOME/.duckdb/extensions/...`; subsequent LOADs are offline. - * We use the default DuckDB install path (fetches from - * `extensions.duckdb.org`) — air-gapped bundling is a separate plan. - * - * 2. Configure a fresh DuckDBConnection so a caller can run a single bulk - * load query against Postgres via `CREATE TABLE AS SELECT ... FROM pg.*`. - * We ATTACH `pg` in READ_ONLY mode using the connection URI inline, - * scoped to the DuckDB instance, with no disk state. - * - * 3. DETACH on request so the underlying PG connection is released - * immediately after the load completes. Per-instance PG attachments are - * transient: held only during CREATE TABLE AS, never across queries. - * - * When the master query-cache flag is off, this service is a no-op. No - * instance is created, no network call is made. - */ -@Injectable() -export class PostgresExtensionService implements OnApplicationBootstrap { - private readonly logger = new Logger(PostgresExtensionService.name); - private ready = false; - private bootstrapFailure: string | null = null; - - constructor( - private readonly config: QueryCacheConfigProvider, - private readonly env: EnvironmentService, - ) {} - - async onApplicationBootstrap(): Promise { - if (!this.config.config.enabled) { - this.logger.log('query cache disabled; skipping postgres extension install'); - return; - } - - const bootstrap = await DuckDBInstance.create(':memory:'); - const conn = await bootstrap.connect(); - try { - // INSTALL writes to $HOME/.duckdb/extensions///. - // First ever boot: fetches from extensions.duckdb.org. Subsequent boots: - // local-disk no-op. - await conn.run('INSTALL postgres'); - await conn.run('LOAD postgres'); - this.ready = true; - this.logger.log('postgres extension installed and loaded'); - } catch (err) { - const error = err as Error; - this.logger.error( - `Failed to install/load postgres extension: ${error.message}`, - ); - if (error.stack) this.logger.error(error.stack); - // Do NOT rethrow. A failed extension install must not crash the whole - // app: the cache service handles this by falling through to Postgres - // when `isReady()` returns false (see `CollectionLoader.load`). - this.ready = false; - this.bootstrapFailure = error.message; - } finally { - conn.closeSync(); - bootstrap.closeSync(); - } - } - - isReady(): boolean { - return this.ready; - } - - /* - * Prepares a fresh DuckDBConnection for a bulk-load query against Postgres. - * Must be paired with `detach()` once CREATE TABLE AS completes. - * - * Safe to call on a just-created instance: LOAD reads from the on-disk - * extension cache populated at bootstrap (no network call). - */ - async configureOnConnection(conn: DuckDBConnection): Promise { - if (!this.ready) { - const reason = this.bootstrapFailure - ? `: ${this.bootstrapFailure}` - : ''; - throw new Error( - `PostgresExtensionService not ready${reason}. Check bootstrap logs.`, - ); - } - - const dbUrl = this.env.getDatabaseURL(); - if (!dbUrl) { - throw new Error('DATABASE_URL is empty; cannot ATTACH from duckdb'); - } - - await conn.run('LOAD postgres'); - - // DuckDB 1.5's `postgres` extension secret syntax expects discrete - // HOST/PORT/etc. parameters and rejects the single CONNECTION_STRING - // field. Passing the URI directly as ATTACH's first argument keeps the - // connection details opaque here and still binds the attachment to this - // DuckDB instance only. READ_ONLY guards against the loader accidentally - // mutating Postgres. - await conn.run( - `ATTACH ${escapeSqlString(dbUrl)} AS pg (TYPE POSTGRES, READ_ONLY)`, - ); - } - - /* - * Releases the PG connection held by this DuckDBConnection's ATTACH. - * Idempotent — safe to call repeatedly, swallows "not attached" errors. - */ - async detach(conn: DuckDBConnection): Promise { - try { - await conn.run('DETACH pg'); - } catch (err) { - const msg = (err as Error).message ?? ''; - // DuckDB wording: "Failed to detach database with name \"pg\": database - // not found". Also handle older "not attached" / catalog-error shapes. - if (!/not attached|not found|does not exist|catalog|failed to detach/i.test(msg)) { - throw err; - } - } - } -} - -function escapeSqlString(s: string): string { - return `'${s.replace(/'/g, "''")}'`; -} diff --git a/apps/server/src/core/base/query-cache/query-cache.module.ts b/apps/server/src/core/base/query-cache/query-cache.module.ts index 4ffd9de8..86771670 100644 --- a/apps/server/src/core/base/query-cache/query-cache.module.ts +++ b/apps/server/src/core/base/query-cache/query-cache.module.ts @@ -1,27 +1,27 @@ import { Module } from '@nestjs/common'; import { QueryCacheConfigProvider } from './query-cache.config'; +import { DuckDbRuntime } from './duckdb-runtime'; import { BaseQueryCacheService } from './base-query-cache.service'; import { BaseQueryRouter } from './base-query-router'; import { CollectionLoader } from './collection-loader'; import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer'; import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber'; -import { PostgresExtensionService } from './postgres-extension.service'; @Module({ providers: [ QueryCacheConfigProvider, - PostgresExtensionService, + DuckDbRuntime, + CollectionLoader, BaseQueryCacheService, BaseQueryRouter, - CollectionLoader, BaseQueryCacheWriteConsumer, BaseQueryCacheSubscriber, ], exports: [ BaseQueryCacheService, BaseQueryRouter, + DuckDbRuntime, QueryCacheConfigProvider, - PostgresExtensionService, ], }) -export class BaseQueryCacheModule {} +export class QueryCacheModule {} diff --git a/apps/server/src/core/base/query-cache/query-cache.types.ts b/apps/server/src/core/base/query-cache/query-cache.types.ts index 256dfd9d..5a17bc7e 100644 --- a/apps/server/src/core/base/query-cache/query-cache.types.ts +++ b/apps/server/src/core/base/query-cache/query-cache.types.ts @@ -1,4 +1,3 @@ -import type { DuckDBConnection, DuckDBInstance } from '@duckdb/node-api'; import type { BaseProperty } from '@docmost/db/types/entity.types'; export type DuckDbColumnType = @@ -9,31 +8,37 @@ export type DuckDbColumnType = | 'JSON'; export type ColumnSpec = { - // The uuid of the property (user-defined props) or a stable literal - // ('id', 'position', 'created_at', 'updated_at', 'last_updated_by_id', - // 'deleted_at', 'search_text') for system columns. + /* + * The uuid of the property (user-defined props) or a stable literal + * ('id', 'position', 'created_at', 'updated_at', 'last_updated_by_id', + * 'deleted_at', 'search_text') for system columns. + */ column: string; ddlType: DuckDbColumnType; indexable: boolean; - // For user-defined props we keep the source BaseProperty so callers can - // resolve the extraction rule from JSON. property?: Pick; }; +/* + * A base held in the shared DuckDB instance. Instead of owning a + * `DuckDBInstance` and `DuckDBConnection`, it now just remembers the schema + * name of its attached in-memory database. The runtime owns the actual + * connections; this is pure metadata. + */ export type LoadedCollection = { baseId: string; + schema: string; // e.g. "b_019c69a51d847985a7f68ee2871d8669" schemaVersion: number; columns: ColumnSpec[]; - instance: DuckDBInstance; - connection: DuckDBConnection; lastAccessedAt: number; - // cached; set by loader, maintained by applyChange rowCount: number; - // Memory stats captured immediately after load. Static until next - // explicit refresh — see `BaseQueryCacheService.refreshMemoryStats` if you - // need up-to-date figures after many applyChange() mutations. - heapBytes: number; - spilledBytes: number; + /* + * Estimated in-memory footprint, in bytes. DuckDB does not expose + * per-attached-db memory accounting, so this is a rough heuristic + * computed at load time: rowCount × columns.length × ~64 bytes. Used + * for cache-size reporting; not for eviction decisions. + */ + approxBytes: number; }; export type ChangeEnvelope =