fix(base): serialize writer operations and prune dead code in cache service

This commit is contained in:
Philipinho
2026-04-23 16:50:11 +01:00
parent 38cd94b2d7
commit dbc1eb539c
@@ -52,6 +52,30 @@ export class BaseQueryCacheService
private readonly collections = new Map<string, LoadedCollection>(); private readonly collections = new Map<string, LoadedCollection>();
private readonly inFlightLoads = new Map<string, Promise<LoadedCollection>>(); private readonly inFlightLoads = new Map<string, Promise<LoadedCollection>>();
/*
* Serializes every write-path call into the shared writer connection.
* DuckDB connections aren't thread-safe for concurrent prepared statements,
* and Redis pub/sub can fire `applyChange` calls concurrently since the
* subscriber's `pmessage` handler doesn't await. We funnel all writes
* (`upsertRow`, `deleteRow`, `updatePosition`, `refreshRowCount`,
* `invalidate`, `evictLru`) through this simple Promise chain so only
* one is in flight at a time. Reads are unaffected — they flow through
* the reader pool, which handles its own concurrency.
*/
private writeQueue: Promise<void> = Promise.resolve();
private async serializeWrite<T>(fn: () => Promise<T>): Promise<T> {
const prev = this.writeQueue;
let unblock!: () => void;
this.writeQueue = new Promise<void>((resolve) => { unblock = resolve; });
try {
await prev;
return await fn();
} finally {
unblock();
}
}
constructor( constructor(
private readonly configProvider: QueryCacheConfigProvider, private readonly configProvider: QueryCacheConfigProvider,
private readonly baseRepo: BaseRepo, private readonly baseRepo: BaseRepo,
@@ -204,7 +228,7 @@ export class BaseQueryCacheService
const tShape = debug ? Date.now() : 0; const tShape = debug ? Date.now() : 0;
const items = duckRows.map((r) => const items = duckRows.map((r) =>
shapeBaseRow(r, collection.columns, sortBuilds), shapeBaseRow(r, collection.columns),
); );
const shapeMs = debug ? Date.now() - tShape : 0; const shapeMs = debug ? Date.now() - tShape : 0;
@@ -252,7 +276,9 @@ export class BaseQueryCacheService
async invalidate(baseId: string): Promise<void> { async invalidate(baseId: string): Promise<void> {
const collection = this.collections.get(baseId); const collection = this.collections.get(baseId);
if (!collection) return; if (!collection) return;
await this.runtime.detachBase(collection.schema); await this.serializeWrite(async () => {
await this.runtime.detachBase(collection.schema);
});
this.collections.delete(baseId); this.collections.delete(baseId);
} }
@@ -372,7 +398,9 @@ export class BaseQueryCacheService
} }
if (existing) { if (existing) {
await this.runtime.detachBase(existing.schema); await this.serializeWrite(async () => {
await this.runtime.detachBase(existing.schema);
});
this.collections.delete(baseId); this.collections.delete(baseId);
} }
@@ -428,7 +456,9 @@ export class BaseQueryCacheService
} }
if (oldestKey) { if (oldestKey) {
const col = this.collections.get(oldestKey)!; const col = this.collections.get(oldestKey)!;
await this.runtime.detachBase(col.schema); await this.serializeWrite(async () => {
await this.runtime.detachBase(col.schema);
});
this.collections.delete(oldestKey); this.collections.delete(oldestKey);
this.logger.debug(`Evicted LRU collection ${oldestKey}`); this.logger.debug(`Evicted LRU collection ${oldestKey}`);
} }
@@ -438,58 +468,62 @@ export class BaseQueryCacheService
collection: LoadedCollection, collection: LoadedCollection,
row: Record<string, unknown>, row: Record<string, unknown>,
): Promise<void> { ): Promise<void> {
const specs = collection.columns; return this.serializeWrite(async () => {
const columnList = specs.map((s) => quoteIdent(s.column)).join(', '); const specs = collection.columns;
const placeholders = specs.map(() => '?').join(', '); const columnList = specs.map((s) => quoteIdent(s.column)).join(', ');
const sql = `INSERT OR REPLACE INTO ${collection.schema}.rows (${columnList}) VALUES (${placeholders})`; const placeholders = specs.map(() => '?').join(', ');
const sql = `INSERT OR REPLACE INTO ${collection.schema}.rows (${columnList}) VALUES (${placeholders})`;
const writer = this.runtime.getWriter(); const writer = this.runtime.getWriter();
const prepared = await writer.prepare(sql); const prepared = await writer.prepare(sql);
for (let i = 0; i < specs.length; i++) { for (let i = 0; i < specs.length; i++) {
const spec = specs[i]; const spec = specs[i];
const oneBased = i + 1; const oneBased = i + 1;
const raw = readFromRowEvent(row, spec); const raw = readFromRowEvent(row, spec);
if (raw == null) { if (raw == null) {
prepared.bindNull(oneBased); prepared.bindNull(oneBased);
continue; continue;
}
switch (spec.ddlType) {
case 'VARCHAR':
prepared.bindVarchar(oneBased, String(raw));
break;
case 'DOUBLE': {
const n = Number(raw);
if (Number.isNaN(n)) prepared.bindNull(oneBased);
else prepared.bindDouble(oneBased, n);
break;
} }
case 'BOOLEAN': switch (spec.ddlType) {
prepared.bindBoolean(oneBased, Boolean(raw)); case 'VARCHAR':
break; prepared.bindVarchar(oneBased, String(raw));
case 'TIMESTAMPTZ': { break;
const d = raw instanceof Date ? raw : new Date(String(raw)); case 'DOUBLE': {
if (Number.isNaN(d.getTime())) prepared.bindNull(oneBased); const n = Number(raw);
else prepared.bindVarchar(oneBased, d.toISOString()); if (Number.isNaN(n)) prepared.bindNull(oneBased);
break; else prepared.bindDouble(oneBased, n);
break;
}
case 'BOOLEAN':
prepared.bindBoolean(oneBased, Boolean(raw));
break;
case 'TIMESTAMPTZ': {
const d = raw instanceof Date ? raw : new Date(String(raw));
if (Number.isNaN(d.getTime())) prepared.bindNull(oneBased);
else prepared.bindVarchar(oneBased, d.toISOString());
break;
}
case 'JSON':
prepared.bindVarchar(oneBased, JSON.stringify(raw));
break;
} }
case 'JSON':
prepared.bindVarchar(oneBased, JSON.stringify(raw));
break;
} }
} await prepared.run();
await prepared.run(); });
} }
private async deleteRow( private async deleteRow(
collection: LoadedCollection, collection: LoadedCollection,
rowId: string, rowId: string,
): Promise<void> { ): Promise<void> {
const writer = this.runtime.getWriter(); return this.serializeWrite(async () => {
const prepared = await writer.prepare( const writer = this.runtime.getWriter();
`DELETE FROM ${collection.schema}.rows WHERE id = ?`, const prepared = await writer.prepare(
); `DELETE FROM ${collection.schema}.rows WHERE id = ?`,
prepared.bindVarchar(1, rowId); );
await prepared.run(); prepared.bindVarchar(1, rowId);
await prepared.run();
});
} }
private async updatePosition( private async updatePosition(
@@ -497,25 +531,30 @@ export class BaseQueryCacheService
rowId: string, rowId: string,
position: string, position: string,
): Promise<void> { ): Promise<void> {
const writer = this.runtime.getWriter(); return this.serializeWrite(async () => {
const prepared = await writer.prepare( const writer = this.runtime.getWriter();
`UPDATE ${collection.schema}.rows SET position = ? WHERE id = ?`, const prepared = await writer.prepare(
); `UPDATE ${collection.schema}.rows SET position = ? WHERE id = ?`,
prepared.bindVarchar(1, position); );
prepared.bindVarchar(2, rowId); prepared.bindVarchar(1, position);
await prepared.run(); prepared.bindVarchar(2, rowId);
await prepared.run();
});
} }
private async refreshRowCount(collection: LoadedCollection): Promise<void> { private async refreshRowCount(collection: LoadedCollection): Promise<void> {
try { return this.serializeWrite(async () => {
const res = await this.runtime.getWriter().runAndReadAll( try {
`SELECT count(*) AS c FROM ${collection.schema}.rows`, const res = await this.runtime.getWriter().runAndReadAll(
); `SELECT count(*) AS c FROM ${collection.schema}.rows`,
const row = res.getRowObjects()[0] as { c: bigint | number }; );
collection.rowCount = Number(row.c); const row = res.getRowObjects()[0] as { c: bigint | number };
} catch { collection.rowCount = Number(row.c);
// stale rowCount self-corrects on next reload collection.approxBytes = collection.rowCount * collection.columns.length * 64;
} } catch {
// stale rowCount self-corrects on next reload
}
});
} }
private recordAccess(baseId: string): void { private recordAccess(baseId: string): void {
@@ -562,7 +601,6 @@ function quoteIdent(name: string): string {
function shapeBaseRow( function shapeBaseRow(
raw: Record<string, unknown>, raw: Record<string, unknown>,
specs: ColumnSpec[], specs: ColumnSpec[],
sortBuilds: SortBuild[],
): BaseRow { ): BaseRow {
const cells: Record<string, unknown> = {}; const cells: Record<string, unknown> = {};
for (const spec of specs) { for (const spec of specs) {