test(server): assert duckdb cache matches postgres on a 100K-row base

This commit is contained in:
Philipinho
2026-04-19 22:28:07 +01:00
parent 4636af3870
commit 55feb01249
2 changed files with 173 additions and 2 deletions
@@ -1,7 +1,7 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigModule } from '@nestjs/config';
import { KyselyModule, InjectKysely } from 'nestjs-kysely';
import { CamelCasePlugin } from 'kysely';
import { CamelCasePlugin, sql } from 'kysely';
import { PostgresJSDialect } from 'kysely-postgres-js';
import * as postgres from 'postgres';
import { Injectable } from '@nestjs/common';
@@ -505,6 +505,165 @@ describeIntegration('BaseQueryCacheService integration', () => {
60_000,
);
const itIfScale =
INTEGRATION_DB_URL && process.env.SCALE_TEST === 'true' ? it : it.skip;
itIfScale(
'100K base: cache and postgres return identical rows for common queries',
async () => {
const seeded = await seedBase({
db: dbHandle.db as any,
workspaceId,
spaceId,
creatorUserId,
rows: 100_000,
name: `cache-scale-${Date.now()}`,
});
const scaleBaseId = seeded.baseId;
try {
const properties = await basePropertyRepo.findByBaseId(scaleBaseId);
const schema: PropertySchema = new Map(
properties.map((p) => [p.id, p]),
);
const statusChoice = seeded.statusChoiceIds[0];
if (!statusChoice) throw new Error('Status choice not seeded');
// Query shapes. High-cardinality sort keys (text, date-with-time) are
// preferred for strict-array-equality parity. We include one
// low-cardinality filter (status eq) to exercise that path as well.
const queryShapes: Array<{
label: string;
filter?: any;
sorts?: Array<{ propertyId: string; direction: 'asc' | 'desc' }>;
}> = [
{
label: 'text sort desc (high-cardinality)',
sorts: [
{ propertyId: seeded.propertyIds.text, direction: 'desc' },
],
},
{
label: 'status eq (low-cardinality filter)',
filter: {
op: 'and',
children: [
{
propertyId: seeded.propertyIds.status,
op: 'eq',
value: statusChoice,
},
],
},
},
{
label: 'number gt 5000 + date desc',
filter: {
op: 'and',
children: [
{
propertyId: seeded.propertyIds.number,
op: 'gt',
value: 5000,
},
],
},
sorts: [
{ propertyId: seeded.propertyIds.date, direction: 'desc' },
],
},
{
label: 'number asc (tie-heavy numeric sort)',
sorts: [
{ propertyId: seeded.propertyIds.number, direction: 'asc' },
],
},
];
const PAGE_LIMIT = 500;
// Postgres refuses to start subtransactions inside parallel workers,
// and the `base_cell_*` UDFs use PL/pgSQL EXCEPTION blocks which need
// one. At 100K rows the planner picks a parallel seq scan and crashes
// with "cannot start subtransactions during a parallel operation".
// Workaround: run all PG list calls inside a transaction that first
// disables parallel query. Tracked separately — once the UDFs are
// marked PARALLEL RESTRICTED in a migration, this wrapper can go.
const collectPg = async (q: {
filter?: any;
sorts?: any;
}): Promise<string[]> => {
return dbHandle.db.transaction().execute(async (trx) => {
await sql`SET LOCAL max_parallel_workers_per_gather = 0`.execute(
trx,
);
const ids: string[] = [];
let cursor: string | undefined = undefined;
for (;;) {
const page = await baseRowRepo.list({
baseId: scaleBaseId,
workspaceId,
filter: q.filter,
sorts: q.sorts,
schema,
pagination: { limit: PAGE_LIMIT, cursor } as any,
trx: trx as any,
});
for (const item of page.items) ids.push(item.id);
if (!page.meta.hasNextPage || !page.meta.nextCursor) break;
cursor = page.meta.nextCursor;
}
return ids;
});
};
const collectCache = async (q: {
filter?: any;
sorts?: any;
}): Promise<string[]> => {
const ids: string[] = [];
let cursor: string | undefined = undefined;
for (;;) {
const page = await cache.list(scaleBaseId, workspaceId, {
filter: q.filter,
sorts: q.sorts,
schema,
pagination: { limit: PAGE_LIMIT, cursor } as any,
});
for (const item of page.items) ids.push(item.id);
if (!page.meta.hasNextPage || !page.meta.nextCursor) break;
cursor = page.meta.nextCursor;
}
return ids;
};
for (const q of queryShapes) {
const pgIds = await collectPg(q);
const dkIds = await collectCache(q);
const pgUniq = new Set(pgIds);
const dkUniq = new Set(dkIds);
// DuckDB must never emit duplicates.
expect(dkIds.length).toBe(dkUniq.size);
if (pgIds.length === pgUniq.size) {
// Strict ordering parity when PG emits no dupes.
expect(dkIds).toEqual(pgIds);
} else {
// PG tie-sort bug surfaced duplicates — fall back to the
// unique-set comparison (same workaround as the 10K numeric-sort
// parity test). TODO: remove once the PG tie-sort duplicate
// emission is fixed (tracked separately).
expect([...dkUniq].sort()).toEqual([...pgUniq].sort());
}
}
} finally {
await deleteSeededBase(dbHandle.db as any, scaleBaseId);
}
},
300_000,
);
it(
'pubsub round-trip: BASE_ROW_UPDATED event propagates to DuckDB',
async () => {
@@ -56,7 +56,12 @@ export type SeededBase = {
notes: string;
created: string;
lastEdited: string;
// Generic aliases used by parity tests.
text: string;
number: string;
date: string;
};
statusChoiceIds: string[];
};
const SKIP_TYPES = new Set([
@@ -342,8 +347,15 @@ export async function seedBase(opts: SeedBaseOptions): Promise<SeededBase> {
notes: byName.get('Notes')!,
created: byName.get('Created')!,
lastEdited: byName.get('Last Edited')!,
text: byName.get('Title')!,
number: byName.get('Estimate')!,
date: byName.get('Due Date')!,
};
const statusProp = insertedProperties.find((p) => p.name === 'Status');
const statusChoiceIds: string[] =
(statusProp?.type_options?.choices ?? []).map((c: any) => c.id);
const generators: Array<{ propertyId: string; generate: CellGenerator }> = [];
for (const prop of insertedProperties) {
const gen = buildCellGenerator(prop, rng);
@@ -382,7 +394,7 @@ export async function seedBase(opts: SeedBaseOptions): Promise<SeededBase> {
await db.insertInto('base_rows').values(rowsBatch).execute();
}
return { baseId, propertyIds };
return { baseId, propertyIds, statusChoiceIds };
}
export async function deleteSeededBase(