From dbc1eb539cbfcf32731e02043de0d292c5149baf Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:50:11 +0100 Subject: [PATCH] fix(base): serialize writer operations and prune dead code in cache service --- .../query-cache/base-query-cache.service.ts | 164 +++++++++++------- 1 file changed, 101 insertions(+), 63 deletions(-) diff --git a/apps/server/src/core/base/query-cache/base-query-cache.service.ts b/apps/server/src/core/base/query-cache/base-query-cache.service.ts index b3cdf286..1172cbac 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.service.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.service.ts @@ -52,6 +52,30 @@ export class BaseQueryCacheService private readonly collections = new Map(); private readonly inFlightLoads = new Map>(); + /* + * Serializes every write-path call into the shared writer connection. + * DuckDB connections aren't thread-safe for concurrent prepared statements, + * and Redis pub/sub can fire `applyChange` calls concurrently since the + * subscriber's `pmessage` handler doesn't await. We funnel all writes + * (`upsertRow`, `deleteRow`, `updatePosition`, `refreshRowCount`, + * `invalidate`, `evictLru`) through this simple Promise chain so only + * one is in flight at a time. Reads are unaffected — they flow through + * the reader pool, which handles its own concurrency. + */ + private writeQueue: Promise = Promise.resolve(); + + private async serializeWrite(fn: () => Promise): Promise { + const prev = this.writeQueue; + let unblock!: () => void; + this.writeQueue = new Promise((resolve) => { unblock = resolve; }); + try { + await prev; + return await fn(); + } finally { + unblock(); + } + } + constructor( private readonly configProvider: QueryCacheConfigProvider, private readonly baseRepo: BaseRepo, @@ -204,7 +228,7 @@ export class BaseQueryCacheService const tShape = debug ? Date.now() : 0; const items = duckRows.map((r) => - shapeBaseRow(r, collection.columns, sortBuilds), + shapeBaseRow(r, collection.columns), ); const shapeMs = debug ? Date.now() - tShape : 0; @@ -252,7 +276,9 @@ export class BaseQueryCacheService async invalidate(baseId: string): Promise { const collection = this.collections.get(baseId); if (!collection) return; - await this.runtime.detachBase(collection.schema); + await this.serializeWrite(async () => { + await this.runtime.detachBase(collection.schema); + }); this.collections.delete(baseId); } @@ -372,7 +398,9 @@ export class BaseQueryCacheService } if (existing) { - await this.runtime.detachBase(existing.schema); + await this.serializeWrite(async () => { + await this.runtime.detachBase(existing.schema); + }); this.collections.delete(baseId); } @@ -428,7 +456,9 @@ export class BaseQueryCacheService } if (oldestKey) { const col = this.collections.get(oldestKey)!; - await this.runtime.detachBase(col.schema); + await this.serializeWrite(async () => { + await this.runtime.detachBase(col.schema); + }); this.collections.delete(oldestKey); this.logger.debug(`Evicted LRU collection ${oldestKey}`); } @@ -438,58 +468,62 @@ export class BaseQueryCacheService collection: LoadedCollection, row: Record, ): Promise { - const specs = collection.columns; - const columnList = specs.map((s) => quoteIdent(s.column)).join(', '); - const placeholders = specs.map(() => '?').join(', '); - const sql = `INSERT OR REPLACE INTO ${collection.schema}.rows (${columnList}) VALUES (${placeholders})`; + return this.serializeWrite(async () => { + const specs = collection.columns; + const columnList = specs.map((s) => quoteIdent(s.column)).join(', '); + const placeholders = specs.map(() => '?').join(', '); + const sql = `INSERT OR REPLACE INTO ${collection.schema}.rows (${columnList}) VALUES (${placeholders})`; - const writer = this.runtime.getWriter(); - const prepared = await writer.prepare(sql); - for (let i = 0; i < specs.length; i++) { - const spec = specs[i]; - const oneBased = i + 1; - const raw = readFromRowEvent(row, spec); - if (raw == null) { - prepared.bindNull(oneBased); - continue; - } - switch (spec.ddlType) { - case 'VARCHAR': - prepared.bindVarchar(oneBased, String(raw)); - break; - case 'DOUBLE': { - const n = Number(raw); - if (Number.isNaN(n)) prepared.bindNull(oneBased); - else prepared.bindDouble(oneBased, n); - break; + const writer = this.runtime.getWriter(); + const prepared = await writer.prepare(sql); + for (let i = 0; i < specs.length; i++) { + const spec = specs[i]; + const oneBased = i + 1; + const raw = readFromRowEvent(row, spec); + if (raw == null) { + prepared.bindNull(oneBased); + continue; } - case 'BOOLEAN': - prepared.bindBoolean(oneBased, Boolean(raw)); - break; - case 'TIMESTAMPTZ': { - const d = raw instanceof Date ? raw : new Date(String(raw)); - if (Number.isNaN(d.getTime())) prepared.bindNull(oneBased); - else prepared.bindVarchar(oneBased, d.toISOString()); - break; + switch (spec.ddlType) { + case 'VARCHAR': + prepared.bindVarchar(oneBased, String(raw)); + break; + case 'DOUBLE': { + const n = Number(raw); + if (Number.isNaN(n)) prepared.bindNull(oneBased); + else prepared.bindDouble(oneBased, n); + break; + } + case 'BOOLEAN': + prepared.bindBoolean(oneBased, Boolean(raw)); + break; + case 'TIMESTAMPTZ': { + const d = raw instanceof Date ? raw : new Date(String(raw)); + if (Number.isNaN(d.getTime())) prepared.bindNull(oneBased); + else prepared.bindVarchar(oneBased, d.toISOString()); + break; + } + case 'JSON': + prepared.bindVarchar(oneBased, JSON.stringify(raw)); + break; } - case 'JSON': - prepared.bindVarchar(oneBased, JSON.stringify(raw)); - break; } - } - await prepared.run(); + await prepared.run(); + }); } private async deleteRow( collection: LoadedCollection, rowId: string, ): Promise { - const writer = this.runtime.getWriter(); - const prepared = await writer.prepare( - `DELETE FROM ${collection.schema}.rows WHERE id = ?`, - ); - prepared.bindVarchar(1, rowId); - await prepared.run(); + return this.serializeWrite(async () => { + const writer = this.runtime.getWriter(); + const prepared = await writer.prepare( + `DELETE FROM ${collection.schema}.rows WHERE id = ?`, + ); + prepared.bindVarchar(1, rowId); + await prepared.run(); + }); } private async updatePosition( @@ -497,25 +531,30 @@ export class BaseQueryCacheService rowId: string, position: string, ): Promise { - const writer = this.runtime.getWriter(); - const prepared = await writer.prepare( - `UPDATE ${collection.schema}.rows SET position = ? WHERE id = ?`, - ); - prepared.bindVarchar(1, position); - prepared.bindVarchar(2, rowId); - await prepared.run(); + return this.serializeWrite(async () => { + const writer = this.runtime.getWriter(); + const prepared = await writer.prepare( + `UPDATE ${collection.schema}.rows SET position = ? WHERE id = ?`, + ); + prepared.bindVarchar(1, position); + prepared.bindVarchar(2, rowId); + await prepared.run(); + }); } private async refreshRowCount(collection: LoadedCollection): Promise { - try { - const res = await this.runtime.getWriter().runAndReadAll( - `SELECT count(*) AS c FROM ${collection.schema}.rows`, - ); - const row = res.getRowObjects()[0] as { c: bigint | number }; - collection.rowCount = Number(row.c); - } catch { - // stale rowCount self-corrects on next reload - } + return this.serializeWrite(async () => { + try { + const res = await this.runtime.getWriter().runAndReadAll( + `SELECT count(*) AS c FROM ${collection.schema}.rows`, + ); + const row = res.getRowObjects()[0] as { c: bigint | number }; + collection.rowCount = Number(row.c); + collection.approxBytes = collection.rowCount * collection.columns.length * 64; + } catch { + // stale rowCount self-corrects on next reload + } + }); } private recordAccess(baseId: string): void { @@ -562,7 +601,6 @@ function quoteIdent(name: string): string { function shapeBaseRow( raw: Record, specs: ColumnSpec[], - sortBuilds: SortBuild[], ): BaseRow { const cells: Record = {}; for (const spec of specs) {