From 4437dcbb6266b17068d6cb6bf15b49c8597ff0f2 Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:23:24 +0100 Subject: [PATCH] feat(base): single-instance duckdb runtime with writer + reader pool --- .../base/query-cache/duckdb-runtime.spec.ts | 117 ++++++++++ .../core/base/query-cache/duckdb-runtime.ts | 211 ++++++++++++++++++ 2 files changed, 328 insertions(+) create mode 100644 apps/server/src/core/base/query-cache/duckdb-runtime.spec.ts create mode 100644 apps/server/src/core/base/query-cache/duckdb-runtime.ts diff --git a/apps/server/src/core/base/query-cache/duckdb-runtime.spec.ts b/apps/server/src/core/base/query-cache/duckdb-runtime.spec.ts new file mode 100644 index 00000000..58696153 --- /dev/null +++ b/apps/server/src/core/base/query-cache/duckdb-runtime.spec.ts @@ -0,0 +1,117 @@ +import { DuckDbRuntime } from './duckdb-runtime'; +import { QueryCacheConfigProvider } from './query-cache.config'; + +const makeConfig = ( + overrides: Partial = {}, +): QueryCacheConfigProvider => + ({ + config: { + enabled: true, + minRows: 25_000, + maxCollections: 50, + warmTopN: 50, + memoryLimit: '256MB', + threads: 2, + tempDirectory: `${require('node:os').tmpdir()}/docmost-duckdb-runtime-test`, + trace: false, + readerPoolSize: 2, + ...overrides, + }, + }) as unknown as QueryCacheConfigProvider; + +const makeEnv = (): { getDatabaseURL: () => string } => ({ + getDatabaseURL: () => process.env.DATABASE_URL ?? '', +}); + +describe('DuckDbRuntime', () => { + it('no-ops when the cache is disabled', async () => { + const rt = new DuckDbRuntime(makeConfig({ enabled: false }), makeEnv() as any); + await rt.onApplicationBootstrap(); + expect(rt.isReady()).toBe(false); + await rt.onModuleDestroy(); + }); + + it('bootstraps instance, extension, PG attach, and reader pool', async () => { + const rt = new DuckDbRuntime(makeConfig(), makeEnv() as any); + await rt.onApplicationBootstrap(); + expect(rt.isReady()).toBe(true); + expect(rt.readerPoolSize()).toBe(2); + await rt.onModuleDestroy(); + }); + + it('attachBase creates a per-base schema and detachBase removes it', async () => { + const rt = new DuckDbRuntime(makeConfig(), makeEnv() as any); + await rt.onApplicationBootstrap(); + try { + const schema = 'b_testaaaaaaaaaaaaaaaaaaaaaaaaaa'; + await rt.attachBase(schema); + await rt.getWriter().run(`CREATE TABLE ${schema}.t (x INTEGER)`); + await rt.getWriter().run(`INSERT INTO ${schema}.t VALUES (1), (2), (3)`); + const res = await rt + .getWriter() + .runAndReadAll(`SELECT count(*) AS c FROM ${schema}.t`); + const row = res.getRowObjects()[0] as { c: bigint | number }; + expect(Number(row.c)).toBe(3); + + await rt.detachBase(schema); + await expect( + rt.getWriter().run(`SELECT count(*) FROM ${schema}.t`), + ).rejects.toThrow(); + } finally { + await rt.onModuleDestroy(); + } + }); + + it('withReader parallelises across pool', async () => { + const rt = new DuckDbRuntime(makeConfig({ readerPoolSize: 2 }), makeEnv() as any); + await rt.onApplicationBootstrap(); + try { + const started: string[] = []; + const ended: string[] = []; + const p1 = rt.withReader(async (conn) => { + started.push('a'); + await new Promise((r) => setTimeout(r, 50)); + await conn.runAndReadAll('SELECT 1'); + ended.push('a'); + }); + const p2 = rt.withReader(async (conn) => { + started.push('b'); + await new Promise((r) => setTimeout(r, 50)); + await conn.runAndReadAll('SELECT 1'); + ended.push('b'); + }); + await Promise.all([p1, p2]); + expect(new Set(started)).toEqual(new Set(['a', 'b'])); + expect(started.length).toBe(2); + expect(ended.length).toBe(2); + } finally { + await rt.onModuleDestroy(); + } + }); + + it('withReader on a 3rd concurrent request with pool=2 queues correctly', async () => { + const rt = new DuckDbRuntime(makeConfig({ readerPoolSize: 2 }), makeEnv() as any); + await rt.onApplicationBootstrap(); + try { + const order: number[] = []; + const makeOne = (n: number, delayMs: number) => + rt.withReader(async () => { + await new Promise((r) => setTimeout(r, delayMs)); + order.push(n); + }); + const p1 = makeOne(1, 40); + const p2 = makeOne(2, 40); + const p3 = makeOne(3, 5); + await Promise.all([p1, p2, p3]); + expect(order.length).toBe(3); + expect(order.indexOf(3)).toBeGreaterThan(0); + } finally { + await rt.onModuleDestroy(); + } + }); + + it('getWriter throws if not ready', () => { + const rt = new DuckDbRuntime(makeConfig(), makeEnv() as any); + expect(() => rt.getWriter()).toThrow(/not ready/i); + }); +}); diff --git a/apps/server/src/core/base/query-cache/duckdb-runtime.ts b/apps/server/src/core/base/query-cache/duckdb-runtime.ts new file mode 100644 index 00000000..a62a1003 --- /dev/null +++ b/apps/server/src/core/base/query-cache/duckdb-runtime.ts @@ -0,0 +1,211 @@ +import { + Injectable, + Logger, + OnApplicationBootstrap, + OnModuleDestroy, +} from '@nestjs/common'; +import { DuckDBInstance, DuckDBConnection } from '@duckdb/node-api'; +import * as fs from 'node:fs'; +import { QueryCacheConfigProvider } from './query-cache.config'; +import { EnvironmentService } from '../../../integrations/environment/environment.service'; +import { ConnectionPool } from './connection-pool'; + +/* + * DuckDbRuntime + * ------------- + * Owns the process-wide DuckDB instance and everything attached to it: + * + * - One `DuckDBInstance` at `:memory:` with `memory_limit`, `threads`, + * `temp_directory` configured from env. + * - One writer `DuckDBConnection` for ATTACH/DETACH/CREATE TABLE/INSERT. + * - A pool of N reader connections for SELECTs; `withReader(fn)` lends + * one out, runs the callback, returns it — fair FIFO under contention. + * - The `postgres` extension is installed + loaded once, not per-base. + * - A single long-lived ATTACH against Postgres (READ_ONLY). All loaders + * reference `postgres_query('pg', $pgsql$ ... $pgsql$)` without doing + * their own attach/detach. + * + * When the query cache is disabled (`config.enabled === false`), the + * runtime is a no-op: nothing is created, `isReady()` returns false, and + * every consumer's own gate prevents it from touching the runtime. + */ +@Injectable() +export class DuckDbRuntime implements OnApplicationBootstrap, OnModuleDestroy { + private readonly logger = new Logger(DuckDbRuntime.name); + private instance: DuckDBInstance | null = null; + private writer: DuckDBConnection | null = null; + private readonly readerPool = new ConnectionPool(); + private readonly attachedSchemas = new Set(); + private ready = false; + private bootstrapFailure: string | null = null; + + constructor( + private readonly configProvider: QueryCacheConfigProvider, + private readonly env: EnvironmentService, + ) {} + + async onApplicationBootstrap(): Promise { + const config = this.configProvider.config; + if (!config.enabled) { + this.logger.log('query cache disabled; skipping duckdb runtime bootstrap'); + return; + } + + const dbUrl = this.env.getDatabaseURL(); + if (!dbUrl) { + this.bootstrapFailure = 'DATABASE_URL is empty'; + this.logger.error('DuckDbRuntime cannot bootstrap: DATABASE_URL is empty'); + return; + } + + try { + fs.mkdirSync(config.tempDirectory, { recursive: true }); + } catch { + /* swallow */ + } + + try { + this.instance = await DuckDBInstance.create(':memory:', { + memory_limit: config.memoryLimit, + threads: String(config.threads), + temp_directory: config.tempDirectory, + }); + + this.writer = await this.instance.connect(); + await this.writer.run('SET preserve_insertion_order = false'); + await this.writer.run('INSTALL postgres'); + await this.writer.run('LOAD postgres'); + await this.writer.run( + `ATTACH ${escapeSqlString(dbUrl)} AS pg (TYPE POSTGRES, READ_ONLY)`, + ); + + const readers: DuckDBConnection[] = []; + for (let i = 0; i < config.readerPoolSize; i++) { + const reader = await this.instance.connect(); + await reader.run('SET preserve_insertion_order = false'); + readers.push(reader); + } + this.readerPool.init(readers); + + this.ready = true; + this.logger.log( + `DuckDbRuntime ready (readers=${config.readerPoolSize}, memory_limit=${config.memoryLimit})`, + ); + } catch (err) { + const error = err as Error; + this.bootstrapFailure = error.message; + this.logger.error(`DuckDbRuntime bootstrap failed: ${error.message}`); + if (error.stack) this.logger.error(error.stack); + this.ready = false; + try { + this.readerPool.close().forEach((c) => c.closeSync()); + } catch { /* swallow */ } + try { + this.writer?.closeSync(); + } catch { /* swallow */ } + try { + this.instance?.closeSync(); + } catch { /* swallow */ } + this.writer = null; + this.instance = null; + } + } + + async onModuleDestroy(): Promise { + for (const c of this.readerPool.close()) { + try { + c.closeSync(); + } catch { /* swallow */ } + } + if (this.writer) { + try { + this.writer.closeSync(); + } catch { /* swallow */ } + this.writer = null; + } + if (this.instance) { + try { + this.instance.closeSync(); + } catch { /* swallow */ } + this.instance = null; + } + this.attachedSchemas.clear(); + this.ready = false; + } + + isReady(): boolean { + return this.ready; + } + + readerPoolSize(): number { + return this.readerPool.size(); + } + + lastBootstrapFailure(): string | null { + return this.bootstrapFailure; + } + + /* + * Attach a new in-memory database for a base. Idempotent: if the schema + * is already attached, this is a no-op. Schema name must come from + * `baseSchemaName()` — validated by the caller; we check shape here + * as defense-in-depth. + */ + async attachBase(schema: string): Promise { + this.requireReady(); + this.requireSchemaShape(schema); + if (this.attachedSchemas.has(schema)) return; + + await this.writer!.run(`ATTACH ':memory:' AS ${schema}`); + this.attachedSchemas.add(schema); + } + + /* + * Detach an in-memory database. Idempotent: detaching a non-attached + * schema is a swallow. Frees all memory held by the attached DB back + * to the shared buffer pool. + */ + async detachBase(schema: string): Promise { + if (!this.ready || !this.writer) return; + this.requireSchemaShape(schema); + if (!this.attachedSchemas.has(schema)) return; + + try { + await this.writer.run(`DETACH DATABASE ${schema}`); + } catch (err) { + const msg = (err as Error).message ?? ''; + if (!/not attached|does not exist|unknown database/i.test(msg)) { + throw err; + } + } finally { + this.attachedSchemas.delete(schema); + } + } + + getWriter(): DuckDBConnection { + this.requireReady(); + return this.writer!; + } + + async withReader(fn: (conn: DuckDBConnection) => Promise): Promise { + this.requireReady(); + return this.readerPool.withResource(fn); + } + + private requireReady(): void { + if (!this.ready || !this.writer) { + const detail = this.bootstrapFailure ? `: ${this.bootstrapFailure}` : ''; + throw new Error(`DuckDbRuntime not ready${detail}`); + } + } + + private requireSchemaShape(schema: string): void { + if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(schema)) { + throw new Error(`Invalid schema name "${schema}"`); + } + } +} + +function escapeSqlString(s: string): string { + return `'${s.replace(/'/g, "''")}'`; +}