diff --git a/apps/server/src/core/base/query-cache/collection-loader.ts b/apps/server/src/core/base/query-cache/collection-loader.ts index 77024c10..ba97ca0c 100644 --- a/apps/server/src/core/base/query-cache/collection-loader.ts +++ b/apps/server/src/core/base/query-cache/collection-loader.ts @@ -2,19 +2,11 @@ import { Injectable, Logger } from '@nestjs/common'; import { DuckDBInstance } from '@duckdb/node-api'; import { BaseRepo } from '@docmost/db/repos/base/base.repo'; import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo'; -import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo'; -import { BaseRow } from '@docmost/db/types/entity.types'; -import { BasePropertyType } from '../base.schemas'; import { buildColumnSpecs } from './column-types'; -import { ColumnSpec, LoadedCollection } from './query-cache.types'; - -// System property type → DuckDB system column name (snake_case). Mirrors -// the mapping in duckdb-query-builder.ts. -const SYSTEM_PROPERTY_COLUMN: Record = { - [BasePropertyType.CREATED_AT]: 'createdAt', - [BasePropertyType.LAST_EDITED_AT]: 'updatedAt', - [BasePropertyType.LAST_EDITED_BY]: 'lastUpdatedById', -}; +import { buildLoaderSql } from './loader-sql'; +import { LoadedCollection } from './query-cache.types'; +import { PostgresExtensionService } from './postgres-extension.service'; +import { QueryCacheConfigProvider } from './query-cache.config'; @Injectable() export class CollectionLoader { @@ -23,10 +15,18 @@ export class CollectionLoader { constructor( private readonly baseRepo: BaseRepo, private readonly basePropertyRepo: BasePropertyRepo, - private readonly baseRowRepo: BaseRowRepo, + private readonly pgExtension: PostgresExtensionService, + private readonly config: QueryCacheConfigProvider, ) {} async load(baseId: string, workspaceId: string): Promise { + if (!this.pgExtension.isReady()) { + throw new Error( + `Cannot load collection ${baseId}: postgres extension not ready. ` + + 'Check PostgresExtensionService bootstrap logs.', + ); + } + const base = await this.baseRepo.findById(baseId); if (!base) { throw new Error(`Base ${baseId} not found`); @@ -36,75 +36,30 @@ export class CollectionLoader { const properties = await this.basePropertyRepo.findByBaseId(baseId); const specs = buildColumnSpecs(properties); - const instance = await DuckDBInstance.create(':memory:'); + const { memoryLimit, threads } = this.config.config; + const instance = await DuckDBInstance.create(':memory:', { + memory_limit: memoryLimit, + threads: String(threads), + }); const connection = await instance.connect(); - let appender: Awaited> | null = - null; try { - const ddl = `CREATE TABLE rows (${specs - .map((s) => `${quoteIdent(s.column)} ${s.ddlType}`) - .join(', ')}, PRIMARY KEY (${quoteIdent('id')}))`; - await connection.run(ddl); + await this.pgExtension.configureOnConnection(connection); - appender = await connection.createAppender('rows'); + // Bulk load via CREATE TABLE AS SELECT. JSONB extraction happens + // server-side via the base_cell_* helpers; DuckDB streams typed + // columns over COPY BINARY into its vectorized insert path. + const sql = buildLoaderSql(specs); + const prepared = await connection.prepare(sql); + prepared.bindVarchar(1, baseId); + prepared.bindVarchar(2, workspaceId); + await prepared.run(); - let rowCount = 0; - for await (const chunk of this.baseRowRepo.streamByBaseId(baseId, { - workspaceId, - chunkSize: 5000, - })) { - for (const row of chunk) { - for (const spec of specs) { - const raw = readFromRow(row, spec); - if (raw == null) { - appender.appendNull(); - continue; - } - switch (spec.ddlType) { - case 'VARCHAR': - appender.appendVarchar(String(raw)); - break; - case 'DOUBLE': { - const n = Number(raw); - if (Number.isNaN(n)) { - this.logger.debug( - `Malformed number for ${spec.column} on row ${row.id}`, - ); - appender.appendNull(); - break; - } - appender.appendDouble(n); - break; - } - case 'BOOLEAN': - appender.appendBoolean(Boolean(raw)); - break; - case 'TIMESTAMPTZ': { - const d = raw instanceof Date ? raw : new Date(String(raw)); - if (Number.isNaN(d.getTime())) { - this.logger.debug( - `Malformed timestamp for ${spec.column} on row ${row.id}`, - ); - appender.appendNull(); - break; - } - appender.appendVarchar(d.toISOString()); - break; - } - case 'JSON': - appender.appendVarchar(JSON.stringify(raw)); - break; - } - } - appender.endRow(); - rowCount++; - } - } - appender.flushSync(); - appender.closeSync(); - appender = null; + // Release the PG connection held by the ATTACH — we're done with + // Postgres; all subsequent queries run purely against the local table. + await this.pgExtension.detach(connection); + // Build ART indexes on indexable columns. for (const spec of specs) { if (!spec.indexable) continue; const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_'); @@ -113,17 +68,17 @@ export class CollectionLoader { ); } - this.logger.debug( - `Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`, - ); - const countResult = await connection.runAndReadAll( 'SELECT count(*) AS c FROM rows', ); - const cachedRowCount = Number( + const rowCount = Number( (countResult.getRowObjects()[0] as { c: bigint | number }).c, ); + this.logger.debug( + `Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`, + ); + return { baseId, schemaVersion, @@ -131,67 +86,29 @@ export class CollectionLoader { instance, connection, lastAccessedAt: Date.now(), - rowCount: cachedRowCount, + rowCount, }; } catch (err) { - if (appender) { - try { - appender.closeSync(); - } catch { - // swallow — best-effort cleanup - } + try { + await this.pgExtension.detach(connection); + } catch { + /* swallow */ } try { connection.closeSync(); } catch { - // swallow — best-effort cleanup + /* swallow */ } try { instance.closeSync(); } catch { - // swallow — best-effort cleanup + /* swallow */ } throw err; } } } -function readFromRow(row: BaseRow, spec: ColumnSpec): unknown { - // System columns - switch (spec.column) { - case 'id': - return row.id; - case 'base_id': - return row.baseId; - case 'workspace_id': - return row.workspaceId; - case 'creator_id': - return row.creatorId; - case 'position': - return row.position; - case 'created_at': - return row.createdAt; - case 'updated_at': - return row.updatedAt; - case 'last_updated_by_id': - return row.lastUpdatedById; - case 'deleted_at': - return null; // loader only inserts live rows - case 'search_text': - return ''; // search stays on Postgres in v1 - } - - // User-defined columns: look up by property id - const prop = spec.property; - if (!prop) return null; - - const sysColumn = SYSTEM_PROPERTY_COLUMN[prop.type]; - if (sysColumn) return (row as any)[sysColumn]; - - const cells = (row.cells as Record | null) ?? {}; - return cells[prop.id] ?? null; -} - function quoteIdent(name: string): string { return `"${name.replace(/"/g, '""')}"`; }