diff --git a/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts b/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts index 92fd99ee..071dbbb5 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts @@ -6,7 +6,7 @@ import { PostgresJSDialect } from 'kysely-postgres-js'; import * as postgres from 'postgres'; import { Injectable } from '@nestjs/common'; import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter'; -import { RedisModule } from '@nestjs-labs/nestjs-ioredis'; +import { RedisModule, RedisService } from '@nestjs-labs/nestjs-ioredis'; import Redis from 'ioredis'; import { BaseRepo } from '@docmost/db/repos/base/base.repo'; import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo'; @@ -19,6 +19,8 @@ import { CollectionLoader } from './collection-loader'; import { PostgresExtensionService } from './postgres-extension.service'; import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer'; import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber'; +import { BaseQueryRouter } from './base-query-router'; +import { BaseRowService } from '../services/base-row.service'; import { EnvironmentService } from '../../../integrations/environment/environment.service'; import { seedBase, deleteSeededBase } from './testing/seed-base'; import { PropertySchema } from '../engine'; @@ -104,6 +106,320 @@ async function isRedisReachable(): Promise { } } +describeIntegration('kill switch: BASE_QUERY_CACHE_ENABLED=false', () => { + @Injectable() + class DisabledEnvService extends FakeEnvService { + getBaseQueryCacheEnabled() { + return false; + } + } + + // Mock RedisService so the write-consumer can be constructed without a + // real Redis, and every publish call is observable. When the feature flag + // is off the consumer's @OnEvent handlers return before touching Redis at + // all, so `publish` should never receive a `base-query-cache:changes:*` + // channel. The subscriber's own bootstrap short-circuit means it never + // opens a subscriber connection either. + const mockRedisClient = { + publish: jest.fn().mockResolvedValue(0), + zadd: jest.fn().mockResolvedValue(1), + zremrangebyrank: jest.fn().mockResolvedValue(0), + zrevrange: jest.fn().mockResolvedValue([]), + }; + const mockRedisService = { + getOrThrow: jest.fn(() => mockRedisClient as unknown as Redis), + getOrNil: jest.fn(() => mockRedisClient as unknown as Redis), + }; + + let moduleRef: TestingModule; + let dbHandle: DbHandle; + let workspaceId: string; + let spaceId: string; + let creatorUserId: string | null; + let seededBaseId: string | null = null; + let pgExtension: PostgresExtensionService; + let router: BaseQueryRouter; + let rowService: BaseRowService; + let basePropertyRepo: BasePropertyRepo; + let baseRowRepo: BaseRowRepo; + let eventEmitter: EventEmitter2; + let duckdbCreateSpy: jest.SpyInstance; + + beforeAll(async () => { + process.env.DATABASE_URL = INTEGRATION_DB_URL; + // Spy BEFORE module init so onApplicationBootstrap is covered by + // toHaveBeenCalledTimes assertions. + duckdbCreateSpy = jest.spyOn( + require('@duckdb/node-api').DuckDBInstance, + 'create', + ); + + moduleRef = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ isGlobal: true }), + KyselyModule.forRoot({ + dialect: new PostgresJSDialect({ + postgres: (postgres as any)( + normalizePostgresUrl(INTEGRATION_DB_URL!), + { + max: 5, + onnotice: () => {}, + types: { + bigint: { + to: 20, + from: [20, 1700], + serialize: (value: number) => value.toString(), + parse: (value: string) => Number.parseInt(value), + }, + }, + }, + ), + }), + plugins: [new CamelCasePlugin()], + }), + EventEmitterModule.forRoot(), + ], + providers: [ + { provide: EnvironmentService, useClass: DisabledEnvService }, + { provide: RedisService, useValue: mockRedisService }, + QueryCacheConfigProvider, + PostgresExtensionService, + BaseRepo, + BasePropertyRepo, + BaseRowRepo, + BaseViewRepo, + CollectionLoader, + BaseQueryCacheService, + BaseQueryRouter, + BaseQueryCacheWriteConsumer, + BaseQueryCacheSubscriber, + BaseRowService, + DbHandle, + ], + }).compile(); + + // Use .init() so onApplicationBootstrap runs. Under the flag-off config + // every bootstrap hook (extension install, warm-up, subscriber psubscribe) + // must short-circuit. + await moduleRef.init(); + + pgExtension = moduleRef.get(PostgresExtensionService); + router = moduleRef.get(BaseQueryRouter); + rowService = moduleRef.get(BaseRowService); + basePropertyRepo = moduleRef.get(BasePropertyRepo); + baseRowRepo = moduleRef.get(BaseRowRepo); + dbHandle = moduleRef.get(DbHandle); + eventEmitter = moduleRef.get(EventEmitter2); + + const workspace = await dbHandle.db + .selectFrom('workspaces') + .select(['id']) + .limit(1) + .executeTakeFirstOrThrow(); + workspaceId = workspace.id; + + const space = await dbHandle.db + .selectFrom('spaces') + .select(['id']) + .where('workspaceId', '=', workspaceId) + .limit(1) + .executeTakeFirstOrThrow(); + spaceId = space.id; + + const user = await dbHandle.db + .selectFrom('users') + .select('id') + .limit(1) + .executeTakeFirst(); + creatorUserId = user?.id ?? null; + + const { baseId } = await seedBase({ + db: dbHandle.db as any, + workspaceId, + spaceId, + creatorUserId, + rows: 200, + name: `cache-killswitch-${Date.now()}`, + }); + seededBaseId = baseId; + }, 180_000); + + afterAll(async () => { + if (seededBaseId && dbHandle) { + await deleteSeededBase(dbHandle.db as any, seededBaseId); + } + if (moduleRef) { + await moduleRef.close(); + } + if (duckdbCreateSpy) { + duckdbCreateSpy.mockRestore(); + } + }, 60_000); + + beforeEach(() => { + mockRedisClient.publish.mockClear(); + mockRedisClient.zadd.mockClear(); + mockRedisClient.zremrangebyrank.mockClear(); + mockRedisClient.zrevrange.mockClear(); + }); + + it( + 'never creates a DuckDB instance and never installs the extension', + async () => { + // Bootstrap has already run via moduleRef.init() in beforeAll. If any + // bootstrap hook forgot to gate on the feature flag, DuckDBInstance.create + // would have fired at least once. + expect(duckdbCreateSpy).toHaveBeenCalledTimes(0); + expect(pgExtension.isReady()).toBe(false); + + const properties = await basePropertyRepo.findByBaseId(seededBaseId!); + const estimateProp = properties.find((p) => p.name === 'Estimate'); + if (!estimateProp) throw new Error('Estimate property not found'); + + // A query that would qualify for the cache (rowCount >= minRows, has a + // filter and a sort). With the flag off, the router must answer + // 'postgres' and nothing along that path may construct a DuckDB instance. + const decision = await router.decide({ + baseId: seededBaseId!, + workspaceId, + filter: { + op: 'and', + children: [ + { + propertyId: estimateProp.id, + op: 'gt', + value: 0, + }, + ], + } as any, + sorts: [{ propertyId: estimateProp.id, direction: 'asc' }], + }); + expect(decision).toBe('postgres'); + + // Run the full service-level list path too, to cover the code branch + // that would have called into the cache if the flag were on. + await rowService.list( + { + baseId: seededBaseId!, + sorts: [{ propertyId: estimateProp.id, direction: 'asc' }], + filter: { + op: 'and', + children: [ + { + propertyId: estimateProp.id, + op: 'gt', + value: 0, + }, + ], + }, + } as any, + { limit: 50 } as any, + workspaceId, + ); + + expect(duckdbCreateSpy).toHaveBeenCalledTimes(0); + }, + 60_000, + ); + + it( + 'list requests return identical results to BaseRowService.list (pure postgres path)', + async () => { + const baseId = seededBaseId!; + const properties = await basePropertyRepo.findByBaseId(baseId); + const schema: PropertySchema = new Map(properties.map((p) => [p.id, p])); + const estimateProp = properties.find((p) => p.name === 'Estimate'); + if (!estimateProp) throw new Error('Estimate property not found'); + + const sorts = [ + { propertyId: estimateProp.id, direction: 'asc' as const }, + ]; + const filter = { + op: 'and' as const, + children: [ + { + propertyId: estimateProp.id, + op: 'gt' as const, + value: 0, + }, + ], + }; + + // Route through BaseRowService — with the flag off, the router returns + // 'postgres' and the service delegates straight to baseRowRepo.list. + const viaService = await rowService.list( + { baseId, sorts, filter } as any, + { limit: 50 } as any, + workspaceId, + ); + + // Call the repo directly with the same arguments — this is the pre- + // DuckDB Docmost path. The two must agree item-for-item AND on meta. + const viaRepo = await baseRowRepo.list({ + baseId, + workspaceId, + sorts, + filter: filter as any, + schema, + pagination: { limit: 50 } as any, + }); + + expect(viaService.items.map((r) => r.id)).toEqual( + viaRepo.items.map((r) => r.id), + ); + expect(viaService.meta).toEqual(viaRepo.meta); + // Spy parity: nothing along either path should have touched DuckDB. + expect(duckdbCreateSpy).toHaveBeenCalledTimes(0); + }, + 60_000, + ); + + it( + 'does not subscribe to or publish on Redis cache-change channels', + async () => { + const baseId = seededBaseId!; + const properties = await basePropertyRepo.findByBaseId(baseId); + const estimateProp = properties.find((p) => p.name === 'Estimate'); + if (!estimateProp) throw new Error('Estimate property not found'); + + // Pick any existing row and emit a BASE_ROW_UPDATED the way the + // BaseRowService normally would. The write-consumer listens via + // @OnEvent and will try to publish only if the flag is on. + const targetRow = await dbHandle.db + .selectFrom('baseRows') + .select(['id']) + .where('baseId', '=', baseId) + .where('workspaceId', '=', workspaceId) + .limit(1) + .executeTakeFirstOrThrow(); + + const event: BaseRowUpdatedEvent = { + baseId, + workspaceId, + actorId: null, + requestId: null, + rowId: targetRow.id, + patch: { [estimateProp.id]: 1 }, + updatedCells: { [estimateProp.id]: 1 }, + }; + // emitAsync so any awaited @OnEvent handler has a chance to run (and + // short-circuit) before we assert. + await eventEmitter.emitAsync(EventName.BASE_ROW_UPDATED, event); + + const cacheChannelPublishes = mockRedisClient.publish.mock.calls.filter( + ([channel]) => + typeof channel === 'string' && + channel.startsWith('base-query-cache:changes:'), + ); + expect(cacheChannelPublishes).toHaveLength(0); + // Stronger claim: the consumer exits before ever resolving the client, + // so getOrThrow on our mock RedisService should also be untouched. + expect(mockRedisService.getOrThrow).not.toHaveBeenCalled(); + }, + 60_000, + ); +}); + describeIntegration('BaseQueryCacheService LRU eviction', () => { @Injectable() class TinyCapEnvService {