refactor(base): replace streaming loader with pg-extension CREATE TABLE AS SELECT

This commit is contained in:
Philipinho
2026-04-23 04:28:25 +01:00
parent e663d7eecf
commit fde0ccb3c7
@@ -2,19 +2,11 @@ import { Injectable, Logger } from '@nestjs/common';
import { DuckDBInstance } from '@duckdb/node-api';
import { BaseRepo } from '@docmost/db/repos/base/base.repo';
import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo';
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
import { BaseRow } from '@docmost/db/types/entity.types';
import { BasePropertyType } from '../base.schemas';
import { buildColumnSpecs } from './column-types';
import { ColumnSpec, LoadedCollection } from './query-cache.types';
// System property type → DuckDB system column name (snake_case). Mirrors
// the mapping in duckdb-query-builder.ts.
const SYSTEM_PROPERTY_COLUMN: Record<string, keyof BaseRow> = {
[BasePropertyType.CREATED_AT]: 'createdAt',
[BasePropertyType.LAST_EDITED_AT]: 'updatedAt',
[BasePropertyType.LAST_EDITED_BY]: 'lastUpdatedById',
};
import { buildLoaderSql } from './loader-sql';
import { LoadedCollection } from './query-cache.types';
import { PostgresExtensionService } from './postgres-extension.service';
import { QueryCacheConfigProvider } from './query-cache.config';
@Injectable()
export class CollectionLoader {
@@ -23,10 +15,18 @@ export class CollectionLoader {
constructor(
private readonly baseRepo: BaseRepo,
private readonly basePropertyRepo: BasePropertyRepo,
private readonly baseRowRepo: BaseRowRepo,
private readonly pgExtension: PostgresExtensionService,
private readonly config: QueryCacheConfigProvider,
) {}
async load(baseId: string, workspaceId: string): Promise<LoadedCollection> {
if (!this.pgExtension.isReady()) {
throw new Error(
`Cannot load collection ${baseId}: postgres extension not ready. ` +
'Check PostgresExtensionService bootstrap logs.',
);
}
const base = await this.baseRepo.findById(baseId);
if (!base) {
throw new Error(`Base ${baseId} not found`);
@@ -36,75 +36,30 @@ export class CollectionLoader {
const properties = await this.basePropertyRepo.findByBaseId(baseId);
const specs = buildColumnSpecs(properties);
const instance = await DuckDBInstance.create(':memory:');
const { memoryLimit, threads } = this.config.config;
const instance = await DuckDBInstance.create(':memory:', {
memory_limit: memoryLimit,
threads: String(threads),
});
const connection = await instance.connect();
let appender: Awaited<ReturnType<typeof connection.createAppender>> | null =
null;
try {
const ddl = `CREATE TABLE rows (${specs
.map((s) => `${quoteIdent(s.column)} ${s.ddlType}`)
.join(', ')}, PRIMARY KEY (${quoteIdent('id')}))`;
await connection.run(ddl);
await this.pgExtension.configureOnConnection(connection);
appender = await connection.createAppender('rows');
// Bulk load via CREATE TABLE AS SELECT. JSONB extraction happens
// server-side via the base_cell_* helpers; DuckDB streams typed
// columns over COPY BINARY into its vectorized insert path.
const sql = buildLoaderSql(specs);
const prepared = await connection.prepare(sql);
prepared.bindVarchar(1, baseId);
prepared.bindVarchar(2, workspaceId);
await prepared.run();
let rowCount = 0;
for await (const chunk of this.baseRowRepo.streamByBaseId(baseId, {
workspaceId,
chunkSize: 5000,
})) {
for (const row of chunk) {
for (const spec of specs) {
const raw = readFromRow(row, spec);
if (raw == null) {
appender.appendNull();
continue;
}
switch (spec.ddlType) {
case 'VARCHAR':
appender.appendVarchar(String(raw));
break;
case 'DOUBLE': {
const n = Number(raw);
if (Number.isNaN(n)) {
this.logger.debug(
`Malformed number for ${spec.column} on row ${row.id}`,
);
appender.appendNull();
break;
}
appender.appendDouble(n);
break;
}
case 'BOOLEAN':
appender.appendBoolean(Boolean(raw));
break;
case 'TIMESTAMPTZ': {
const d = raw instanceof Date ? raw : new Date(String(raw));
if (Number.isNaN(d.getTime())) {
this.logger.debug(
`Malformed timestamp for ${spec.column} on row ${row.id}`,
);
appender.appendNull();
break;
}
appender.appendVarchar(d.toISOString());
break;
}
case 'JSON':
appender.appendVarchar(JSON.stringify(raw));
break;
}
}
appender.endRow();
rowCount++;
}
}
appender.flushSync();
appender.closeSync();
appender = null;
// Release the PG connection held by the ATTACH — we're done with
// Postgres; all subsequent queries run purely against the local table.
await this.pgExtension.detach(connection);
// Build ART indexes on indexable columns.
for (const spec of specs) {
if (!spec.indexable) continue;
const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_');
@@ -113,17 +68,17 @@ export class CollectionLoader {
);
}
this.logger.debug(
`Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`,
);
const countResult = await connection.runAndReadAll(
'SELECT count(*) AS c FROM rows',
);
const cachedRowCount = Number(
const rowCount = Number(
(countResult.getRowObjects()[0] as { c: bigint | number }).c,
);
this.logger.debug(
`Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`,
);
return {
baseId,
schemaVersion,
@@ -131,67 +86,29 @@ export class CollectionLoader {
instance,
connection,
lastAccessedAt: Date.now(),
rowCount: cachedRowCount,
rowCount,
};
} catch (err) {
if (appender) {
try {
appender.closeSync();
await this.pgExtension.detach(connection);
} catch {
// swallow — best-effort cleanup
}
/* swallow */
}
try {
connection.closeSync();
} catch {
// swallow — best-effort cleanup
/* swallow */
}
try {
instance.closeSync();
} catch {
// swallow — best-effort cleanup
/* swallow */
}
throw err;
}
}
}
function readFromRow(row: BaseRow, spec: ColumnSpec): unknown {
// System columns
switch (spec.column) {
case 'id':
return row.id;
case 'base_id':
return row.baseId;
case 'workspace_id':
return row.workspaceId;
case 'creator_id':
return row.creatorId;
case 'position':
return row.position;
case 'created_at':
return row.createdAt;
case 'updated_at':
return row.updatedAt;
case 'last_updated_by_id':
return row.lastUpdatedById;
case 'deleted_at':
return null; // loader only inserts live rows
case 'search_text':
return ''; // search stays on Postgres in v1
}
// User-defined columns: look up by property id
const prop = spec.property;
if (!prop) return null;
const sysColumn = SYSTEM_PROPERTY_COLUMN[prop.type];
if (sysColumn) return (row as any)[sysColumn];
const cells = (row.cells as Record<string, unknown> | null) ?? {};
return cells[prop.id] ?? null;
}
function quoteIdent(name: string): string {
return `"${name.replace(/"/g, '""')}"`;
}