fix(server): close duckdb resources on load failure, dedupe concurrent loads, drop unused cells projection

This commit is contained in:
Philipinho
2026-04-19 21:39:05 +01:00
parent 91ad3de258
commit 45000bbd8b
3 changed files with 120 additions and 92 deletions
@@ -40,6 +40,7 @@ export class BaseQueryCacheService
{
private readonly logger = new Logger(BaseQueryCacheService.name);
private readonly collections = new Map<string, LoadedCollection>();
private readonly inFlightLoads = new Map<string, Promise<LoadedCollection>>();
constructor(
private readonly configProvider: QueryCacheConfigProvider,
@@ -168,6 +169,8 @@ export class BaseQueryCacheService
baseId: string,
workspaceId: string,
): Promise<LoadedCollection> {
// TODO(task-7): remove per-request findById once pub/sub invalidation
// keeps collections in sync with schema bumps.
const existing = this.collections.get(baseId);
const base = await this.baseRepo.findById(baseId);
@@ -186,14 +189,24 @@ export class BaseQueryCacheService
this.collections.delete(baseId);
}
const { maxCollections } = this.configProvider.config;
if (this.collections.size >= maxCollections) {
this.evictLru();
}
const inFlight = this.inFlightLoads.get(baseId);
if (inFlight) return inFlight;
const loaded = await this.collectionLoader.load(baseId, workspaceId);
this.collections.set(baseId, loaded);
return loaded;
const promise = (async () => {
try {
const { maxCollections } = this.configProvider.config;
if (this.collections.size >= maxCollections) {
this.evictLru();
}
const loaded = await this.collectionLoader.load(baseId, workspaceId);
this.collections.set(baseId, loaded);
return loaded;
} finally {
this.inFlightLoads.delete(baseId);
}
})();
this.inFlightLoads.set(baseId, promise);
return promise;
}
private evictLru(): void {
@@ -228,10 +241,10 @@ export class BaseQueryCacheService
}
// Convert a DuckDB row object back into the BaseRow JSON shape. The builder
// projects `cells` as a json_object keyed by property id; typed columns
// (DOUBLE, BOOLEAN, TIMESTAMPTZ) round-trip as JS primitives / Date objects.
// We reconstruct `cells` directly from the per-property columns so the JSON
// payload matches what Postgres returns.
// projects one column per user property; typed columns (DOUBLE, BOOLEAN,
// TIMESTAMPTZ) round-trip as JS primitives / Date objects. We reconstruct
// `cells` directly from the per-property columns so the JSON payload matches
// what Postgres returns.
function shapeBaseRow(
raw: Record<string, unknown>,
specs: ColumnSpec[],
@@ -38,89 +38,113 @@ export class CollectionLoader {
const instance = await DuckDBInstance.create(':memory:');
const connection = await instance.connect();
let appender: Awaited<ReturnType<typeof connection.createAppender>> | null =
null;
const ddl = `CREATE TABLE rows (${specs
.map((s) => `${quoteIdent(s.column)} ${s.ddlType}`)
.join(', ')}, PRIMARY KEY (${quoteIdent('id')}))`;
await connection.run(ddl);
try {
const ddl = `CREATE TABLE rows (${specs
.map((s) => `${quoteIdent(s.column)} ${s.ddlType}`)
.join(', ')}, PRIMARY KEY (${quoteIdent('id')}))`;
await connection.run(ddl);
const appender = await connection.createAppender('rows');
appender = await connection.createAppender('rows');
let rowCount = 0;
for await (const chunk of this.baseRowRepo.streamByBaseId(baseId, {
workspaceId,
chunkSize: 5000,
})) {
for (const row of chunk) {
for (const spec of specs) {
const raw = readFromRow(row, spec);
if (raw == null) {
appender.appendNull();
continue;
}
switch (spec.ddlType) {
case 'VARCHAR':
appender.appendVarchar(String(raw));
break;
case 'DOUBLE': {
const n = Number(raw);
if (Number.isNaN(n)) {
this.logger.debug(
`Malformed number for ${spec.column} on row ${row.id}`,
);
appender.appendNull();
let rowCount = 0;
for await (const chunk of this.baseRowRepo.streamByBaseId(baseId, {
workspaceId,
chunkSize: 5000,
})) {
for (const row of chunk) {
for (const spec of specs) {
const raw = readFromRow(row, spec);
if (raw == null) {
appender.appendNull();
continue;
}
switch (spec.ddlType) {
case 'VARCHAR':
appender.appendVarchar(String(raw));
break;
case 'DOUBLE': {
const n = Number(raw);
if (Number.isNaN(n)) {
this.logger.debug(
`Malformed number for ${spec.column} on row ${row.id}`,
);
appender.appendNull();
break;
}
appender.appendDouble(n);
break;
}
appender.appendDouble(n);
break;
}
case 'BOOLEAN':
appender.appendBoolean(Boolean(raw));
break;
case 'TIMESTAMPTZ': {
const d = raw instanceof Date ? raw : new Date(String(raw));
if (Number.isNaN(d.getTime())) {
this.logger.debug(
`Malformed timestamp for ${spec.column} on row ${row.id}`,
);
appender.appendNull();
case 'BOOLEAN':
appender.appendBoolean(Boolean(raw));
break;
case 'TIMESTAMPTZ': {
const d = raw instanceof Date ? raw : new Date(String(raw));
if (Number.isNaN(d.getTime())) {
this.logger.debug(
`Malformed timestamp for ${spec.column} on row ${row.id}`,
);
appender.appendNull();
break;
}
appender.appendVarchar(d.toISOString());
break;
}
appender.appendVarchar(d.toISOString());
break;
case 'JSON':
appender.appendVarchar(JSON.stringify(raw));
break;
}
case 'JSON':
appender.appendVarchar(JSON.stringify(raw));
break;
}
appender.endRow();
rowCount++;
}
appender.endRow();
rowCount++;
}
}
appender.flushSync();
appender.closeSync();
appender.flushSync();
appender.closeSync();
appender = null;
for (const spec of specs) {
if (!spec.indexable) continue;
const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_');
await connection.run(
`CREATE INDEX ${quoteIdent(`idx_${safe}`)} ON rows (${quoteIdent(spec.column)})`,
for (const spec of specs) {
if (!spec.indexable) continue;
const safe = spec.column.replace(/[^a-zA-Z0-9_]/g, '_');
await connection.run(
`CREATE INDEX ${quoteIdent(`idx_${safe}`)} ON rows (${quoteIdent(spec.column)})`,
);
}
this.logger.debug(
`Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`,
);
return {
baseId,
schemaVersion,
columns: specs,
instance,
connection,
lastAccessedAt: Date.now(),
};
} catch (err) {
if (appender) {
try {
appender.closeSync();
} catch {
// swallow — best-effort cleanup
}
}
try {
connection.closeSync();
} catch {
// swallow — best-effort cleanup
}
try {
instance.closeSync();
} catch {
// swallow — best-effort cleanup
}
throw err;
}
this.logger.debug(
`Loaded ${rowCount} rows for base ${baseId} (schemaVersion=${schemaVersion})`,
);
return {
baseId,
schemaVersion,
columns: specs,
instance,
connection,
lastAccessedAt: Date.now(),
};
}
}
@@ -97,11 +97,9 @@ export function buildDuckDbListQuery(
// --- select projection -------------------------------------------------
function buildSelect(index: ColumnIndex, sortBuilds: SortBuild[]): string[] {
const cellsJson = buildCellsJson(index.userColumns);
const parts: string[] = [
'id',
'base_id',
`${cellsJson} AS cells`,
'position',
'creator_id',
'last_updated_by_id',
@@ -110,22 +108,15 @@ function buildSelect(index: ColumnIndex, sortBuilds: SortBuild[]): string[] {
'updated_at',
'deleted_at',
];
for (const col of index.userColumns) {
parts.push(quoteIdent(col.column));
}
for (const sb of sortBuilds) {
parts.push(`${sb.expression} AS ${sb.key}`);
}
return parts;
}
function buildCellsJson(userColumns: ColumnSpec[]): string {
if (userColumns.length === 0) return `'{}'::JSON`;
const entries: string[] = [];
for (const col of userColumns) {
entries.push(`'${col.column}'`);
entries.push(quoteIdent(col.column));
}
return `json_object(${entries.join(', ')})`;
}
// --- filter ------------------------------------------------------------
function buildFilter(