From 55feb01249be3ac7f61bf3a21c941ab99e4038a3 Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Sun, 19 Apr 2026 22:28:07 +0100 Subject: [PATCH] test(server): assert duckdb cache matches postgres on a 100K-row base --- .../base-query-cache.integration.spec.ts | 161 +++++++++++++++++- .../base/query-cache/testing/seed-base.ts | 14 +- 2 files changed, 173 insertions(+), 2 deletions(-) diff --git a/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts b/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts index b91fe00e..b01e76bc 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts @@ -1,7 +1,7 @@ import { Test, TestingModule } from '@nestjs/testing'; import { ConfigModule } from '@nestjs/config'; import { KyselyModule, InjectKysely } from 'nestjs-kysely'; -import { CamelCasePlugin } from 'kysely'; +import { CamelCasePlugin, sql } from 'kysely'; import { PostgresJSDialect } from 'kysely-postgres-js'; import * as postgres from 'postgres'; import { Injectable } from '@nestjs/common'; @@ -505,6 +505,165 @@ describeIntegration('BaseQueryCacheService integration', () => { 60_000, ); + const itIfScale = + INTEGRATION_DB_URL && process.env.SCALE_TEST === 'true' ? it : it.skip; + + itIfScale( + '100K base: cache and postgres return identical rows for common queries', + async () => { + const seeded = await seedBase({ + db: dbHandle.db as any, + workspaceId, + spaceId, + creatorUserId, + rows: 100_000, + name: `cache-scale-${Date.now()}`, + }); + const scaleBaseId = seeded.baseId; + try { + const properties = await basePropertyRepo.findByBaseId(scaleBaseId); + const schema: PropertySchema = new Map( + properties.map((p) => [p.id, p]), + ); + + const statusChoice = seeded.statusChoiceIds[0]; + if (!statusChoice) throw new Error('Status choice not seeded'); + + // Query shapes. High-cardinality sort keys (text, date-with-time) are + // preferred for strict-array-equality parity. We include one + // low-cardinality filter (status eq) to exercise that path as well. + const queryShapes: Array<{ + label: string; + filter?: any; + sorts?: Array<{ propertyId: string; direction: 'asc' | 'desc' }>; + }> = [ + { + label: 'text sort desc (high-cardinality)', + sorts: [ + { propertyId: seeded.propertyIds.text, direction: 'desc' }, + ], + }, + { + label: 'status eq (low-cardinality filter)', + filter: { + op: 'and', + children: [ + { + propertyId: seeded.propertyIds.status, + op: 'eq', + value: statusChoice, + }, + ], + }, + }, + { + label: 'number gt 5000 + date desc', + filter: { + op: 'and', + children: [ + { + propertyId: seeded.propertyIds.number, + op: 'gt', + value: 5000, + }, + ], + }, + sorts: [ + { propertyId: seeded.propertyIds.date, direction: 'desc' }, + ], + }, + { + label: 'number asc (tie-heavy numeric sort)', + sorts: [ + { propertyId: seeded.propertyIds.number, direction: 'asc' }, + ], + }, + ]; + + const PAGE_LIMIT = 500; + + // Postgres refuses to start subtransactions inside parallel workers, + // and the `base_cell_*` UDFs use PL/pgSQL EXCEPTION blocks which need + // one. At 100K rows the planner picks a parallel seq scan and crashes + // with "cannot start subtransactions during a parallel operation". + // Workaround: run all PG list calls inside a transaction that first + // disables parallel query. Tracked separately — once the UDFs are + // marked PARALLEL RESTRICTED in a migration, this wrapper can go. + const collectPg = async (q: { + filter?: any; + sorts?: any; + }): Promise => { + return dbHandle.db.transaction().execute(async (trx) => { + await sql`SET LOCAL max_parallel_workers_per_gather = 0`.execute( + trx, + ); + const ids: string[] = []; + let cursor: string | undefined = undefined; + for (;;) { + const page = await baseRowRepo.list({ + baseId: scaleBaseId, + workspaceId, + filter: q.filter, + sorts: q.sorts, + schema, + pagination: { limit: PAGE_LIMIT, cursor } as any, + trx: trx as any, + }); + for (const item of page.items) ids.push(item.id); + if (!page.meta.hasNextPage || !page.meta.nextCursor) break; + cursor = page.meta.nextCursor; + } + return ids; + }); + }; + + const collectCache = async (q: { + filter?: any; + sorts?: any; + }): Promise => { + const ids: string[] = []; + let cursor: string | undefined = undefined; + for (;;) { + const page = await cache.list(scaleBaseId, workspaceId, { + filter: q.filter, + sorts: q.sorts, + schema, + pagination: { limit: PAGE_LIMIT, cursor } as any, + }); + for (const item of page.items) ids.push(item.id); + if (!page.meta.hasNextPage || !page.meta.nextCursor) break; + cursor = page.meta.nextCursor; + } + return ids; + }; + + for (const q of queryShapes) { + const pgIds = await collectPg(q); + const dkIds = await collectCache(q); + + const pgUniq = new Set(pgIds); + const dkUniq = new Set(dkIds); + // DuckDB must never emit duplicates. + expect(dkIds.length).toBe(dkUniq.size); + + if (pgIds.length === pgUniq.size) { + // Strict ordering parity when PG emits no dupes. + expect(dkIds).toEqual(pgIds); + } else { + // PG tie-sort bug surfaced duplicates — fall back to the + // unique-set comparison (same workaround as the 10K numeric-sort + // parity test). TODO: remove once the PG tie-sort duplicate + // emission is fixed (tracked separately). + expect([...dkUniq].sort()).toEqual([...pgUniq].sort()); + } + } + } finally { + await deleteSeededBase(dbHandle.db as any, scaleBaseId); + } + }, + 300_000, + ); + it( 'pubsub round-trip: BASE_ROW_UPDATED event propagates to DuckDB', async () => { diff --git a/apps/server/src/core/base/query-cache/testing/seed-base.ts b/apps/server/src/core/base/query-cache/testing/seed-base.ts index 382e755f..3c629049 100644 --- a/apps/server/src/core/base/query-cache/testing/seed-base.ts +++ b/apps/server/src/core/base/query-cache/testing/seed-base.ts @@ -56,7 +56,12 @@ export type SeededBase = { notes: string; created: string; lastEdited: string; + // Generic aliases used by parity tests. + text: string; + number: string; + date: string; }; + statusChoiceIds: string[]; }; const SKIP_TYPES = new Set([ @@ -342,8 +347,15 @@ export async function seedBase(opts: SeedBaseOptions): Promise { notes: byName.get('Notes')!, created: byName.get('Created')!, lastEdited: byName.get('Last Edited')!, + text: byName.get('Title')!, + number: byName.get('Estimate')!, + date: byName.get('Due Date')!, }; + const statusProp = insertedProperties.find((p) => p.name === 'Status'); + const statusChoiceIds: string[] = + (statusProp?.type_options?.choices ?? []).map((c: any) => c.id); + const generators: Array<{ propertyId: string; generate: CellGenerator }> = []; for (const prop of insertedProperties) { const gen = buildCellGenerator(prop, rng); @@ -382,7 +394,7 @@ export async function seedBase(opts: SeedBaseOptions): Promise { await db.insertInto('base_rows').values(rowsBatch).execute(); } - return { baseId, propertyIds }; + return { baseId, propertyIds, statusChoiceIds }; } export async function deleteSeededBase(