diff --git a/apps/server/src/core/base/query-cache/base-query-cache.service.ts b/apps/server/src/core/base/query-cache/base-query-cache.service.ts index 25a81e57..c3eddfed 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.service.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.service.ts @@ -40,6 +40,7 @@ export class BaseQueryCacheService { private readonly logger = new Logger(BaseQueryCacheService.name); private readonly collections = new Map(); + private readonly inFlightLoads = new Map>(); constructor( private readonly configProvider: QueryCacheConfigProvider, @@ -168,6 +169,8 @@ export class BaseQueryCacheService baseId: string, workspaceId: string, ): Promise { + // TODO(task-7): remove per-request findById once pub/sub invalidation + // keeps collections in sync with schema bumps. const existing = this.collections.get(baseId); const base = await this.baseRepo.findById(baseId); @@ -186,14 +189,24 @@ export class BaseQueryCacheService this.collections.delete(baseId); } - const { maxCollections } = this.configProvider.config; - if (this.collections.size >= maxCollections) { - this.evictLru(); - } + const inFlight = this.inFlightLoads.get(baseId); + if (inFlight) return inFlight; - const loaded = await this.collectionLoader.load(baseId, workspaceId); - this.collections.set(baseId, loaded); - return loaded; + const promise = (async () => { + try { + const { maxCollections } = this.configProvider.config; + if (this.collections.size >= maxCollections) { + this.evictLru(); + } + const loaded = await this.collectionLoader.load(baseId, workspaceId); + this.collections.set(baseId, loaded); + return loaded; + } finally { + this.inFlightLoads.delete(baseId); + } + })(); + this.inFlightLoads.set(baseId, promise); + return promise; } private evictLru(): void { @@ -228,10 +241,10 @@ export class BaseQueryCacheService } // Convert a DuckDB row object back into the BaseRow JSON shape. The builder -// projects `cells` as a json_object keyed by property id; typed columns -// (DOUBLE, BOOLEAN, TIMESTAMPTZ) round-trip as JS primitives / Date objects. -// We reconstruct `cells` directly from the per-property columns so the JSON -// payload matches what Postgres returns. +// projects one column per user property; typed columns (DOUBLE, BOOLEAN, +// TIMESTAMPTZ) round-trip as JS primitives / Date objects. We reconstruct +// `cells` directly from the per-property columns so the JSON payload matches +// what Postgres returns. function shapeBaseRow( raw: Record, specs: ColumnSpec[], diff --git a/apps/server/src/core/base/query-cache/collection-loader.ts b/apps/server/src/core/base/query-cache/collection-loader.ts index 2a273324..2fe59823 100644 --- a/apps/server/src/core/base/query-cache/collection-loader.ts +++ b/apps/server/src/core/base/query-cache/collection-loader.ts @@ -38,89 +38,113 @@ export class CollectionLoader { const instance = await DuckDBInstance.create(':memory:'); const connection = await instance.connect(); + let appender: Awaited> | null = + null; - const ddl = `CREATE TABLE rows (${specs - .map((s) => `${quoteIdent(s.column)} ${s.ddlType}`) - .join(', ')}, PRIMARY KEY (${quoteIdent('id')}))`; - await connection.run(ddl); + try { + const ddl = `CREATE TABLE rows (${specs + .map((s) => `${quoteIdent(s.column)} ${s.ddlType}`) + .join(', ')}, PRIMARY KEY (${quoteIdent('id')}))`; + await connection.run(ddl); - const appender = await connection.createAppender('rows'); + appender = await connection.createAppender('rows'); - let rowCount = 0; - for await (const chunk of this.baseRowRepo.streamByBaseId(baseId, { - workspaceId, - chunkSize: 5000, - })) { - for (const row of chunk) { - for (const spec of specs) { - const raw = readFromRow(row, spec); - if (raw == null) { - appender.appendNull(); - continue; - } - switch (spec.ddlType) { - case 'VARCHAR': - appender.appendVarchar(String(raw)); - break; - case 'DOUBLE': { - const n = Number(raw); - if (Number.isNaN(n)) { - this.logger.debug( - `Malformed number for ${spec.column} on row ${row.id}`, - ); - appender.appendNull(); + let rowCount = 0; + for await (const chunk of this.baseRowRepo.streamByBaseId(baseId, { + workspaceId, + chunkSize: 5000, + })) { + for (const row of chunk) { + for (const spec of specs) { + const raw = readFromRow(row, spec); + if (raw == null) { + appender.appendNull(); + continue; + } + switch (spec.ddlType) { + case 'VARCHAR': + appender.appendVarchar(String(raw)); + break; + case 'DOUBLE': { + const n = Number(raw); + if (Number.isNaN(n)) { + this.logger.debug( + `Malformed number for ${spec.column} on row ${row.id}`, + ); + appender.appendNull(); + break; + } + appender.appendDouble(n); break; } - appender.appendDouble(n); - break; - } - case 'BOOLEAN': - appender.appendBoolean(Boolean(raw)); - break; - case 'TIMESTAMPTZ': { - const d = raw instanceof Date ? raw : new Date(String(raw)); - if (Number.isNaN(d.getTime())) { - this.logger.debug( - `Malformed timestamp for ${spec.column} on row ${row.id}`, - ); - appender.appendNull(); + case 'BOOLEAN': + appender.appendBoolean(Boolean(raw)); + break; + case 'TIMESTAMPTZ': { + const d = raw instanceof Date ? raw : new Date(String(raw)); + if (Number.isNaN(d.getTime())) { + this.logger.debug( + `Malformed timestamp for ${spec.column} on row ${row.id}`, + ); + appender.appendNull(); + break; + } + appender.appendVarchar(d.toISOString()); break; } - appender.appendVarchar(d.toISOString()); - break; + case 'JSON': + appender.appendVarchar(JSON.stringify(raw)); + break; } - case 'JSON': - appender.appendVarchar(JSON.stringify(raw)); - break; } + appender.endRow(); + rowCount++; } - appender.endRow(); - rowCount++; } - } - appender.flushSync(); - appender.closeSync(); + appender.flushSync(); + appender.closeSync(); + appender = null; - for (const spec of specs) { - if (!spec.indexable) continue; - const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_'); - await connection.run( - `CREATE INDEX ${quoteIdent(`idx_${safe}`)} ON rows (${quoteIdent(spec.column)})`, + for (const spec of specs) { + if (!spec.indexable) continue; + const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_'); + await connection.run( + `CREATE INDEX ${quoteIdent(`idx_${safe}`)} ON rows (${quoteIdent(spec.column)})`, + ); + } + + this.logger.debug( + `Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`, ); + + return { + baseId, + schemaVersion, + columns: specs, + instance, + connection, + lastAccessedAt: Date.now(), + }; + } catch (err) { + if (appender) { + try { + appender.closeSync(); + } catch { + // swallow — best-effort cleanup + } + } + try { + connection.closeSync(); + } catch { + // swallow — best-effort cleanup + } + try { + instance.closeSync(); + } catch { + // swallow — best-effort cleanup + } + throw err; } - - this.logger.debug( - `Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`, - ); - - return { - baseId, - schemaVersion, - columns: specs, - instance, - connection, - lastAccessedAt: Date.now(), - }; } } diff --git a/apps/server/src/core/base/query-cache/duckdb-query-builder.ts b/apps/server/src/core/base/query-cache/duckdb-query-builder.ts index a5097dce..0da72ff8 100644 --- a/apps/server/src/core/base/query-cache/duckdb-query-builder.ts +++ b/apps/server/src/core/base/query-cache/duckdb-query-builder.ts @@ -97,11 +97,9 @@ export function buildDuckDbListQuery( // --- select projection ------------------------------------------------- function buildSelect(index: ColumnIndex, sortBuilds: SortBuild[]): string[] { - const cellsJson = buildCellsJson(index.userColumns); const parts: string[] = [ 'id', 'base_id', - `${cellsJson} AS cells`, 'position', 'creator_id', 'last_updated_by_id', @@ -110,22 +108,15 @@ function buildSelect(index: ColumnIndex, sortBuilds: SortBuild[]): string[] { 'updated_at', 'deleted_at', ]; + for (const col of index.userColumns) { + parts.push(quoteIdent(col.column)); + } for (const sb of sortBuilds) { parts.push(`${sb.expression} AS ${sb.key}`); } return parts; } -function buildCellsJson(userColumns: ColumnSpec[]): string { - if (userColumns.length === 0) return `'{}'::JSON`; - const entries: string[] = []; - for (const col of userColumns) { - entries.push(`'${col.column}'`); - entries.push(quoteIdent(col.column)); - } - return `json_object(${entries.join(', ')})`; -} - // --- filter ------------------------------------------------------------ function buildFilter(