mirror of
https://github.com/docmost/docmost.git
synced 2026-05-07 06:23:06 +08:00
test(base): assert pure-postgres path when query cache is disabled
This commit is contained in:
@@ -6,7 +6,7 @@ import { PostgresJSDialect } from 'kysely-postgres-js';
|
|||||||
import * as postgres from 'postgres';
|
import * as postgres from 'postgres';
|
||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter';
|
import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter';
|
||||||
import { RedisModule } from '@nestjs-labs/nestjs-ioredis';
|
import { RedisModule, RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||||
import Redis from 'ioredis';
|
import Redis from 'ioredis';
|
||||||
import { BaseRepo } from '@docmost/db/repos/base/base.repo';
|
import { BaseRepo } from '@docmost/db/repos/base/base.repo';
|
||||||
import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo';
|
import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo';
|
||||||
@@ -19,6 +19,8 @@ import { CollectionLoader } from './collection-loader';
|
|||||||
import { PostgresExtensionService } from './postgres-extension.service';
|
import { PostgresExtensionService } from './postgres-extension.service';
|
||||||
import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer';
|
import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer';
|
||||||
import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber';
|
import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber';
|
||||||
|
import { BaseQueryRouter } from './base-query-router';
|
||||||
|
import { BaseRowService } from '../services/base-row.service';
|
||||||
import { EnvironmentService } from '../../../integrations/environment/environment.service';
|
import { EnvironmentService } from '../../../integrations/environment/environment.service';
|
||||||
import { seedBase, deleteSeededBase } from './testing/seed-base';
|
import { seedBase, deleteSeededBase } from './testing/seed-base';
|
||||||
import { PropertySchema } from '../engine';
|
import { PropertySchema } from '../engine';
|
||||||
@@ -104,6 +106,320 @@ async function isRedisReachable(): Promise<boolean> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
describeIntegration('kill switch: BASE_QUERY_CACHE_ENABLED=false', () => {
|
||||||
|
@Injectable()
|
||||||
|
class DisabledEnvService extends FakeEnvService {
|
||||||
|
getBaseQueryCacheEnabled() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mock RedisService so the write-consumer can be constructed without a
|
||||||
|
// real Redis, and every publish call is observable. When the feature flag
|
||||||
|
// is off the consumer's @OnEvent handlers return before touching Redis at
|
||||||
|
// all, so `publish` should never receive a `base-query-cache:changes:*`
|
||||||
|
// channel. The subscriber's own bootstrap short-circuit means it never
|
||||||
|
// opens a subscriber connection either.
|
||||||
|
const mockRedisClient = {
|
||||||
|
publish: jest.fn().mockResolvedValue(0),
|
||||||
|
zadd: jest.fn().mockResolvedValue(1),
|
||||||
|
zremrangebyrank: jest.fn().mockResolvedValue(0),
|
||||||
|
zrevrange: jest.fn().mockResolvedValue([]),
|
||||||
|
};
|
||||||
|
const mockRedisService = {
|
||||||
|
getOrThrow: jest.fn(() => mockRedisClient as unknown as Redis),
|
||||||
|
getOrNil: jest.fn(() => mockRedisClient as unknown as Redis),
|
||||||
|
};
|
||||||
|
|
||||||
|
let moduleRef: TestingModule;
|
||||||
|
let dbHandle: DbHandle;
|
||||||
|
let workspaceId: string;
|
||||||
|
let spaceId: string;
|
||||||
|
let creatorUserId: string | null;
|
||||||
|
let seededBaseId: string | null = null;
|
||||||
|
let pgExtension: PostgresExtensionService;
|
||||||
|
let router: BaseQueryRouter;
|
||||||
|
let rowService: BaseRowService;
|
||||||
|
let basePropertyRepo: BasePropertyRepo;
|
||||||
|
let baseRowRepo: BaseRowRepo;
|
||||||
|
let eventEmitter: EventEmitter2;
|
||||||
|
let duckdbCreateSpy: jest.SpyInstance;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
process.env.DATABASE_URL = INTEGRATION_DB_URL;
|
||||||
|
// Spy BEFORE module init so onApplicationBootstrap is covered by
|
||||||
|
// toHaveBeenCalledTimes assertions.
|
||||||
|
duckdbCreateSpy = jest.spyOn(
|
||||||
|
require('@duckdb/node-api').DuckDBInstance,
|
||||||
|
'create',
|
||||||
|
);
|
||||||
|
|
||||||
|
moduleRef = await Test.createTestingModule({
|
||||||
|
imports: [
|
||||||
|
ConfigModule.forRoot({ isGlobal: true }),
|
||||||
|
KyselyModule.forRoot({
|
||||||
|
dialect: new PostgresJSDialect({
|
||||||
|
postgres: (postgres as any)(
|
||||||
|
normalizePostgresUrl(INTEGRATION_DB_URL!),
|
||||||
|
{
|
||||||
|
max: 5,
|
||||||
|
onnotice: () => {},
|
||||||
|
types: {
|
||||||
|
bigint: {
|
||||||
|
to: 20,
|
||||||
|
from: [20, 1700],
|
||||||
|
serialize: (value: number) => value.toString(),
|
||||||
|
parse: (value: string) => Number.parseInt(value),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
plugins: [new CamelCasePlugin()],
|
||||||
|
}),
|
||||||
|
EventEmitterModule.forRoot(),
|
||||||
|
],
|
||||||
|
providers: [
|
||||||
|
{ provide: EnvironmentService, useClass: DisabledEnvService },
|
||||||
|
{ provide: RedisService, useValue: mockRedisService },
|
||||||
|
QueryCacheConfigProvider,
|
||||||
|
PostgresExtensionService,
|
||||||
|
BaseRepo,
|
||||||
|
BasePropertyRepo,
|
||||||
|
BaseRowRepo,
|
||||||
|
BaseViewRepo,
|
||||||
|
CollectionLoader,
|
||||||
|
BaseQueryCacheService,
|
||||||
|
BaseQueryRouter,
|
||||||
|
BaseQueryCacheWriteConsumer,
|
||||||
|
BaseQueryCacheSubscriber,
|
||||||
|
BaseRowService,
|
||||||
|
DbHandle,
|
||||||
|
],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
// Use .init() so onApplicationBootstrap runs. Under the flag-off config
|
||||||
|
// every bootstrap hook (extension install, warm-up, subscriber psubscribe)
|
||||||
|
// must short-circuit.
|
||||||
|
await moduleRef.init();
|
||||||
|
|
||||||
|
pgExtension = moduleRef.get(PostgresExtensionService);
|
||||||
|
router = moduleRef.get(BaseQueryRouter);
|
||||||
|
rowService = moduleRef.get(BaseRowService);
|
||||||
|
basePropertyRepo = moduleRef.get(BasePropertyRepo);
|
||||||
|
baseRowRepo = moduleRef.get(BaseRowRepo);
|
||||||
|
dbHandle = moduleRef.get(DbHandle);
|
||||||
|
eventEmitter = moduleRef.get(EventEmitter2);
|
||||||
|
|
||||||
|
const workspace = await dbHandle.db
|
||||||
|
.selectFrom('workspaces')
|
||||||
|
.select(['id'])
|
||||||
|
.limit(1)
|
||||||
|
.executeTakeFirstOrThrow();
|
||||||
|
workspaceId = workspace.id;
|
||||||
|
|
||||||
|
const space = await dbHandle.db
|
||||||
|
.selectFrom('spaces')
|
||||||
|
.select(['id'])
|
||||||
|
.where('workspaceId', '=', workspaceId)
|
||||||
|
.limit(1)
|
||||||
|
.executeTakeFirstOrThrow();
|
||||||
|
spaceId = space.id;
|
||||||
|
|
||||||
|
const user = await dbHandle.db
|
||||||
|
.selectFrom('users')
|
||||||
|
.select('id')
|
||||||
|
.limit(1)
|
||||||
|
.executeTakeFirst();
|
||||||
|
creatorUserId = user?.id ?? null;
|
||||||
|
|
||||||
|
const { baseId } = await seedBase({
|
||||||
|
db: dbHandle.db as any,
|
||||||
|
workspaceId,
|
||||||
|
spaceId,
|
||||||
|
creatorUserId,
|
||||||
|
rows: 200,
|
||||||
|
name: `cache-killswitch-${Date.now()}`,
|
||||||
|
});
|
||||||
|
seededBaseId = baseId;
|
||||||
|
}, 180_000);
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
if (seededBaseId && dbHandle) {
|
||||||
|
await deleteSeededBase(dbHandle.db as any, seededBaseId);
|
||||||
|
}
|
||||||
|
if (moduleRef) {
|
||||||
|
await moduleRef.close();
|
||||||
|
}
|
||||||
|
if (duckdbCreateSpy) {
|
||||||
|
duckdbCreateSpy.mockRestore();
|
||||||
|
}
|
||||||
|
}, 60_000);
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
mockRedisClient.publish.mockClear();
|
||||||
|
mockRedisClient.zadd.mockClear();
|
||||||
|
mockRedisClient.zremrangebyrank.mockClear();
|
||||||
|
mockRedisClient.zrevrange.mockClear();
|
||||||
|
});
|
||||||
|
|
||||||
|
it(
|
||||||
|
'never creates a DuckDB instance and never installs the extension',
|
||||||
|
async () => {
|
||||||
|
// Bootstrap has already run via moduleRef.init() in beforeAll. If any
|
||||||
|
// bootstrap hook forgot to gate on the feature flag, DuckDBInstance.create
|
||||||
|
// would have fired at least once.
|
||||||
|
expect(duckdbCreateSpy).toHaveBeenCalledTimes(0);
|
||||||
|
expect(pgExtension.isReady()).toBe(false);
|
||||||
|
|
||||||
|
const properties = await basePropertyRepo.findByBaseId(seededBaseId!);
|
||||||
|
const estimateProp = properties.find((p) => p.name === 'Estimate');
|
||||||
|
if (!estimateProp) throw new Error('Estimate property not found');
|
||||||
|
|
||||||
|
// A query that would qualify for the cache (rowCount >= minRows, has a
|
||||||
|
// filter and a sort). With the flag off, the router must answer
|
||||||
|
// 'postgres' and nothing along that path may construct a DuckDB instance.
|
||||||
|
const decision = await router.decide({
|
||||||
|
baseId: seededBaseId!,
|
||||||
|
workspaceId,
|
||||||
|
filter: {
|
||||||
|
op: 'and',
|
||||||
|
children: [
|
||||||
|
{
|
||||||
|
propertyId: estimateProp.id,
|
||||||
|
op: 'gt',
|
||||||
|
value: 0,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
} as any,
|
||||||
|
sorts: [{ propertyId: estimateProp.id, direction: 'asc' }],
|
||||||
|
});
|
||||||
|
expect(decision).toBe('postgres');
|
||||||
|
|
||||||
|
// Run the full service-level list path too, to cover the code branch
|
||||||
|
// that would have called into the cache if the flag were on.
|
||||||
|
await rowService.list(
|
||||||
|
{
|
||||||
|
baseId: seededBaseId!,
|
||||||
|
sorts: [{ propertyId: estimateProp.id, direction: 'asc' }],
|
||||||
|
filter: {
|
||||||
|
op: 'and',
|
||||||
|
children: [
|
||||||
|
{
|
||||||
|
propertyId: estimateProp.id,
|
||||||
|
op: 'gt',
|
||||||
|
value: 0,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
} as any,
|
||||||
|
{ limit: 50 } as any,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(duckdbCreateSpy).toHaveBeenCalledTimes(0);
|
||||||
|
},
|
||||||
|
60_000,
|
||||||
|
);
|
||||||
|
|
||||||
|
it(
|
||||||
|
'list requests return identical results to BaseRowService.list (pure postgres path)',
|
||||||
|
async () => {
|
||||||
|
const baseId = seededBaseId!;
|
||||||
|
const properties = await basePropertyRepo.findByBaseId(baseId);
|
||||||
|
const schema: PropertySchema = new Map(properties.map((p) => [p.id, p]));
|
||||||
|
const estimateProp = properties.find((p) => p.name === 'Estimate');
|
||||||
|
if (!estimateProp) throw new Error('Estimate property not found');
|
||||||
|
|
||||||
|
const sorts = [
|
||||||
|
{ propertyId: estimateProp.id, direction: 'asc' as const },
|
||||||
|
];
|
||||||
|
const filter = {
|
||||||
|
op: 'and' as const,
|
||||||
|
children: [
|
||||||
|
{
|
||||||
|
propertyId: estimateProp.id,
|
||||||
|
op: 'gt' as const,
|
||||||
|
value: 0,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
|
||||||
|
// Route through BaseRowService — with the flag off, the router returns
|
||||||
|
// 'postgres' and the service delegates straight to baseRowRepo.list.
|
||||||
|
const viaService = await rowService.list(
|
||||||
|
{ baseId, sorts, filter } as any,
|
||||||
|
{ limit: 50 } as any,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Call the repo directly with the same arguments — this is the pre-
|
||||||
|
// DuckDB Docmost path. The two must agree item-for-item AND on meta.
|
||||||
|
const viaRepo = await baseRowRepo.list({
|
||||||
|
baseId,
|
||||||
|
workspaceId,
|
||||||
|
sorts,
|
||||||
|
filter: filter as any,
|
||||||
|
schema,
|
||||||
|
pagination: { limit: 50 } as any,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(viaService.items.map((r) => r.id)).toEqual(
|
||||||
|
viaRepo.items.map((r) => r.id),
|
||||||
|
);
|
||||||
|
expect(viaService.meta).toEqual(viaRepo.meta);
|
||||||
|
// Spy parity: nothing along either path should have touched DuckDB.
|
||||||
|
expect(duckdbCreateSpy).toHaveBeenCalledTimes(0);
|
||||||
|
},
|
||||||
|
60_000,
|
||||||
|
);
|
||||||
|
|
||||||
|
it(
|
||||||
|
'does not subscribe to or publish on Redis cache-change channels',
|
||||||
|
async () => {
|
||||||
|
const baseId = seededBaseId!;
|
||||||
|
const properties = await basePropertyRepo.findByBaseId(baseId);
|
||||||
|
const estimateProp = properties.find((p) => p.name === 'Estimate');
|
||||||
|
if (!estimateProp) throw new Error('Estimate property not found');
|
||||||
|
|
||||||
|
// Pick any existing row and emit a BASE_ROW_UPDATED the way the
|
||||||
|
// BaseRowService normally would. The write-consumer listens via
|
||||||
|
// @OnEvent and will try to publish only if the flag is on.
|
||||||
|
const targetRow = await dbHandle.db
|
||||||
|
.selectFrom('baseRows')
|
||||||
|
.select(['id'])
|
||||||
|
.where('baseId', '=', baseId)
|
||||||
|
.where('workspaceId', '=', workspaceId)
|
||||||
|
.limit(1)
|
||||||
|
.executeTakeFirstOrThrow();
|
||||||
|
|
||||||
|
const event: BaseRowUpdatedEvent = {
|
||||||
|
baseId,
|
||||||
|
workspaceId,
|
||||||
|
actorId: null,
|
||||||
|
requestId: null,
|
||||||
|
rowId: targetRow.id,
|
||||||
|
patch: { [estimateProp.id]: 1 },
|
||||||
|
updatedCells: { [estimateProp.id]: 1 },
|
||||||
|
};
|
||||||
|
// emitAsync so any awaited @OnEvent handler has a chance to run (and
|
||||||
|
// short-circuit) before we assert.
|
||||||
|
await eventEmitter.emitAsync(EventName.BASE_ROW_UPDATED, event);
|
||||||
|
|
||||||
|
const cacheChannelPublishes = mockRedisClient.publish.mock.calls.filter(
|
||||||
|
([channel]) =>
|
||||||
|
typeof channel === 'string' &&
|
||||||
|
channel.startsWith('base-query-cache:changes:'),
|
||||||
|
);
|
||||||
|
expect(cacheChannelPublishes).toHaveLength(0);
|
||||||
|
// Stronger claim: the consumer exits before ever resolving the client,
|
||||||
|
// so getOrThrow on our mock RedisService should also be untouched.
|
||||||
|
expect(mockRedisService.getOrThrow).not.toHaveBeenCalled();
|
||||||
|
},
|
||||||
|
60_000,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
describeIntegration('BaseQueryCacheService LRU eviction', () => {
|
describeIntegration('BaseQueryCacheService LRU eviction', () => {
|
||||||
@Injectable()
|
@Injectable()
|
||||||
class TinyCapEnvService {
|
class TinyCapEnvService {
|
||||||
|
|||||||
Reference in New Issue
Block a user