diff --git a/apps/server/src/core/base/query-cache/postgres-extension.service.spec.ts b/apps/server/src/core/base/query-cache/postgres-extension.service.spec.ts new file mode 100644 index 00000000..dfa27b8d --- /dev/null +++ b/apps/server/src/core/base/query-cache/postgres-extension.service.spec.ts @@ -0,0 +1,93 @@ +import { DuckDBInstance } from '@duckdb/node-api'; +import { PostgresExtensionService } from './postgres-extension.service'; +import { QueryCacheConfigProvider } from './query-cache.config'; + +const makeConfig = ( + overrides: Partial = {}, +): QueryCacheConfigProvider => + ({ + config: { + enabled: true, + minRows: 25_000, + maxCollections: 50, + warmTopN: 50, + memoryLimit: '64MB', + threads: 2, + ...overrides, + }, + }) as unknown as QueryCacheConfigProvider; + +const makeEnv = ( + overrides: { dbUrl?: string } = {}, +): { getDatabaseURL: () => string } => ({ + getDatabaseURL: () => overrides.dbUrl ?? process.env.DATABASE_URL ?? '', +}); + +describe('PostgresExtensionService', () => { + it('no-ops when the query cache is disabled', async () => { + const svc = new PostgresExtensionService( + makeConfig({ enabled: false }), + makeEnv() as any, + ); + await expect(svc.onApplicationBootstrap()).resolves.toBeUndefined(); + expect(svc.isReady()).toBe(false); + }); + + it('installs and loads the postgres extension on bootstrap when enabled', async () => { + const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any); + // First run hits the network (extensions.duckdb.org). Subsequent runs read from cache. + await svc.onApplicationBootstrap(); + expect(svc.isReady()).toBe(true); + }); + + it('configureOnConnection loads the extension and attaches pg in a fresh instance', async () => { + const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any); + await svc.onApplicationBootstrap(); + + const instance = await DuckDBInstance.create(':memory:'); + const conn = await instance.connect(); + try { + await svc.configureOnConnection(conn); + // Smoke-test: query any PG system table. DuckDB's postgres scanner + // exposes PG catalog tables under the attached schema's pg_catalog. + const res = await conn.runAndReadAll( + 'SELECT count(*) AS c FROM pg.pg_catalog.pg_database', + ); + const row = res.getRowObjects()[0] as { c: bigint | number }; + expect(Number(row.c)).toBeGreaterThan(0); + await svc.detach(conn); + } finally { + await conn.closeSync(); + await instance.closeSync(); + } + }); + + it('detach is idempotent', async () => { + const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any); + await svc.onApplicationBootstrap(); + + const instance = await DuckDBInstance.create(':memory:'); + const conn = await instance.connect(); + try { + await svc.configureOnConnection(conn); + await svc.detach(conn); + await expect(svc.detach(conn)).resolves.toBeUndefined(); + } finally { + await conn.closeSync(); + await instance.closeSync(); + } + }); + + it('configureOnConnection throws a clear error when bootstrap never ran', async () => { + const svc = new PostgresExtensionService(makeConfig(), makeEnv() as any); + // Intentionally NOT calling onApplicationBootstrap. + const instance = await DuckDBInstance.create(':memory:'); + const conn = await instance.connect(); + try { + await expect(svc.configureOnConnection(conn)).rejects.toThrow(/not ready/i); + } finally { + await conn.closeSync(); + await instance.closeSync(); + } + }); +}); diff --git a/apps/server/src/core/base/query-cache/postgres-extension.service.ts b/apps/server/src/core/base/query-cache/postgres-extension.service.ts new file mode 100644 index 00000000..45813f25 --- /dev/null +++ b/apps/server/src/core/base/query-cache/postgres-extension.service.ts @@ -0,0 +1,129 @@ +import { + Injectable, + Logger, + OnApplicationBootstrap, +} from '@nestjs/common'; +import { DuckDBInstance, DuckDBConnection } from '@duckdb/node-api'; +import { QueryCacheConfigProvider } from './query-cache.config'; +import { EnvironmentService } from '../../../integrations/environment/environment.service'; + +/* + * Owns the lifecycle of DuckDB's `postgres` extension for the query-cache + * module. Responsibilities: + * + * 1. Install the extension once per process at bootstrap. DuckDB caches the + * binary to `$HOME/.duckdb/extensions/...`; subsequent LOADs are offline. + * We use the default DuckDB install path (fetches from + * `extensions.duckdb.org`) — air-gapped bundling is a separate plan. + * + * 2. Configure a fresh DuckDBConnection so a caller can run a single bulk + * load query against Postgres via `CREATE TABLE AS SELECT ... FROM pg.*`. + * We ATTACH `pg` in READ_ONLY mode using the connection URI inline, + * scoped to the DuckDB instance, with no disk state. + * + * 3. DETACH on request so the underlying PG connection is released + * immediately after the load completes. Per-instance PG attachments are + * transient: held only during CREATE TABLE AS, never across queries. + * + * When the master query-cache flag is off, this service is a no-op. No + * instance is created, no network call is made. + */ +@Injectable() +export class PostgresExtensionService implements OnApplicationBootstrap { + private readonly logger = new Logger(PostgresExtensionService.name); + private ready = false; + + constructor( + private readonly config: QueryCacheConfigProvider, + private readonly env: EnvironmentService, + ) {} + + async onApplicationBootstrap(): Promise { + if (!this.config.config.enabled) { + this.logger.log('query cache disabled; skipping postgres extension install'); + return; + } + + const bootstrap = await DuckDBInstance.create(':memory:'); + const conn = await bootstrap.connect(); + try { + // INSTALL writes to $HOME/.duckdb/extensions///. + // First ever boot: fetches from extensions.duckdb.org. Subsequent boots: + // local-disk no-op. + await conn.run('INSTALL postgres'); + await conn.run('LOAD postgres'); + this.ready = true; + this.logger.log('postgres extension installed and loaded'); + } catch (err) { + const error = err as Error; + this.logger.error( + `Failed to install/load postgres extension: ${error.message}`, + ); + if (error.stack) this.logger.error(error.stack); + // Do NOT rethrow. A failed extension install must not crash the whole + // app: the cache service handles this by falling through to Postgres + // when `isReady()` returns false (see `CollectionLoader.load`). + this.ready = false; + } finally { + await conn.closeSync(); + await bootstrap.closeSync(); + } + } + + isReady(): boolean { + return this.ready; + } + + /* + * Prepares a fresh DuckDBConnection for a bulk-load query against Postgres. + * Must be paired with `detach()` once CREATE TABLE AS completes. + * + * Safe to call on a just-created instance: LOAD reads from the on-disk + * extension cache populated at bootstrap (no network call). + */ + async configureOnConnection(conn: DuckDBConnection): Promise { + if (!this.ready) { + throw new Error( + 'PostgresExtensionService not ready — check bootstrap logs', + ); + } + + const dbUrl = this.env.getDatabaseURL(); + if (!dbUrl) { + throw new Error('DATABASE_URL is empty; cannot ATTACH from duckdb'); + } + + await conn.run('LOAD postgres'); + + // DuckDB 1.5's `postgres` extension secret syntax expects discrete + // HOST/PORT/etc. parameters and rejects the single CONNECTION_STRING + // field. Passing the URI directly as ATTACH's first argument keeps the + // connection details opaque here and still binds the attachment to this + // DuckDB instance only. READ_ONLY guards against the loader accidentally + // mutating Postgres. + await conn.run( + `ATTACH ${escapeSqlString(dbUrl)} AS pg (TYPE POSTGRES, READ_ONLY)`, + ); + } + + /* + * Releases the PG connection held by this DuckDBConnection's ATTACH. + * Idempotent — safe to call repeatedly, swallows "not attached" errors. + */ + async detach(conn: DuckDBConnection): Promise { + try { + await conn.run('DETACH pg'); + } catch (err) { + const msg = (err as Error).message ?? ''; + // DuckDB wording: "Failed to detach database with name \"pg\": database + // not found". Also handle older "not attached" / catalog-error shapes. + if (!/not attached|not found|does not exist|catalog|failed to detach/i.test(msg)) { + throw err; + } + } + } +} + +function escapeSqlString(s: string): string { + return `'${s.replace(/'/g, "''")}'`; +} diff --git a/apps/server/src/core/base/query-cache/query-cache.module.ts b/apps/server/src/core/base/query-cache/query-cache.module.ts index 0daa7a25..4ffd9de8 100644 --- a/apps/server/src/core/base/query-cache/query-cache.module.ts +++ b/apps/server/src/core/base/query-cache/query-cache.module.ts @@ -5,16 +5,23 @@ import { BaseQueryRouter } from './base-query-router'; import { CollectionLoader } from './collection-loader'; import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer'; import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber'; +import { PostgresExtensionService } from './postgres-extension.service'; @Module({ providers: [ QueryCacheConfigProvider, + PostgresExtensionService, BaseQueryCacheService, BaseQueryRouter, CollectionLoader, BaseQueryCacheWriteConsumer, BaseQueryCacheSubscriber, ], - exports: [BaseQueryCacheService, BaseQueryRouter, QueryCacheConfigProvider], + exports: [ + BaseQueryCacheService, + BaseQueryRouter, + QueryCacheConfigProvider, + PostgresExtensionService, + ], }) export class BaseQueryCacheModule {}