diff --git a/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts b/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts index e0a1bbf6..b91fe00e 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts @@ -569,3 +569,200 @@ describeIntegration('BaseQueryCacheService integration', () => { 60_000, ); }); + +describeIntegration('BaseQueryCacheService warm-up on boot', () => { + @Injectable() + class WarmUpEnvService { + getDatabaseURL() { + return INTEGRATION_DB_URL!; + } + getDatabaseMaxPool() { + return 5; + } + getNodeEnv() { + return 'test'; + } + getBaseQueryCacheEnabled() { + return true; + } + getBaseQueryCacheMinRows() { + return 100; + } + getBaseQueryCacheMaxCollections() { + return 5; + } + getBaseQueryCacheWarmTopN() { + return 5; + } + getRedisUrl() { + return REDIS_URL; + } + } + + async function buildModule(): Promise { + const moduleRef = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ isGlobal: true }), + KyselyModule.forRoot({ + dialect: new PostgresJSDialect({ + postgres: (postgres as any)( + normalizePostgresUrl(INTEGRATION_DB_URL!), + { + max: 5, + onnotice: () => {}, + types: { + bigint: { + to: 20, + from: [20, 1700], + serialize: (value: number) => value.toString(), + parse: (value: string) => Number.parseInt(value), + }, + }, + }, + ), + }), + plugins: [new CamelCasePlugin()], + }), + EventEmitterModule.forRoot(), + RedisModule.forRoot({ + readyLog: false, + config: { host: '127.0.0.1', port: 6379 }, + }), + ], + providers: [ + { provide: EnvironmentService, useClass: WarmUpEnvService }, + QueryCacheConfigProvider, + BaseRepo, + BasePropertyRepo, + BaseRowRepo, + BaseViewRepo, + CollectionLoader, + BaseQueryCacheService, + DbHandle, + ], + }).compile(); + await moduleRef.init(); + return moduleRef; + } + + let firstModule: TestingModule | null = null; + let secondModule: TestingModule | null = null; + let seededBaseId: string | null = null; + let redisReachable = false; + let probeRedis: Redis | null = null; + + beforeAll(async () => { + process.env.DATABASE_URL = INTEGRATION_DB_URL; + process.env.REDIS_URL = REDIS_URL; + redisReachable = await isRedisReachable(); + if (!redisReachable) return; + + probeRedis = new Redis(REDIS_URL, { maxRetriesPerRequest: 1 }); + // Scrub any stale state from prior runs so zrevrange returns only the + // ids this test records. + await probeRedis.del('base-query-cache:recent'); + + firstModule = await buildModule(); + const dbHandle = firstModule.get(DbHandle); + + const workspace = await dbHandle.db + .selectFrom('workspaces') + .select(['id']) + .limit(1) + .executeTakeFirstOrThrow(); + const space = await dbHandle.db + .selectFrom('spaces') + .select(['id']) + .where('workspaceId', '=', workspace.id) + .limit(1) + .executeTakeFirstOrThrow(); + const user = await dbHandle.db + .selectFrom('users') + .select('id') + .limit(1) + .executeTakeFirst(); + + const seed = await seedBase({ + db: dbHandle.db as any, + workspaceId: workspace.id, + spaceId: space.id, + creatorUserId: user?.id ?? null, + rows: 200, + name: `cache-warmup-${Date.now()}`, + }); + seededBaseId = seed.baseId; + }, 180_000); + + afterAll(async () => { + if (firstModule && seededBaseId) { + const dbHandle = firstModule.get(DbHandle); + await deleteSeededBase(dbHandle.db as any, seededBaseId); + } + if (firstModule) await firstModule.close(); + if (secondModule) await secondModule.close(); + if (probeRedis) { + try { + await probeRedis.del('base-query-cache:recent'); + } catch {} + probeRedis.disconnect(); + } + }, 60_000); + + it( + 'records access in redis and warms the collection on boot', + async () => { + if (!redisReachable) { + console.warn('Skipping warm-up test: Redis not reachable'); + return; + } + const baseId = seededBaseId!; + const cache = firstModule!.get(BaseQueryCacheService); + const basePropertyRepo = firstModule!.get(BasePropertyRepo); + const dbHandle = firstModule!.get(DbHandle); + + const workspace = await dbHandle.db + .selectFrom('workspaces') + .select(['id']) + .limit(1) + .executeTakeFirstOrThrow(); + const workspaceId = workspace.id; + + const properties = await basePropertyRepo.findByBaseId(baseId); + const schema: PropertySchema = new Map(properties.map((p) => [p.id, p])); + + await cache.list(baseId, workspaceId, { + schema, + pagination: { limit: 10 } as any, + }); + + // recordAccess is fire-and-forget; give the ZADD time to round-trip. + await new Promise((r) => setTimeout(r, 200)); + + const recent = await probeRedis!.zrevrange( + 'base-query-cache:recent', + 0, + 0, + ); + expect(recent).toEqual([baseId]); + + // Simulate a fresh boot: close the current service, build a new module, + // and assert warm-up populates the collection without calling list(). + await firstModule!.close(); + firstModule = null; + + secondModule = await buildModule(); + const cache2 = secondModule.get(BaseQueryCacheService); + + // onApplicationBootstrap is called by moduleRef.init() above; but to be + // explicit about the warm-up path we assert residency directly. + expect(cache2.isResident(baseId)).toBe(true); + + const page = await cache2.list(baseId, workspaceId, { + schema, + pagination: { limit: 10 } as any, + }); + expect(page.items.length).toBe(10); + }, + 120_000, + ); +}); diff --git a/apps/server/src/core/base/query-cache/base-query-cache.service.ts b/apps/server/src/core/base/query-cache/base-query-cache.service.ts index 8af61581..0faebc40 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.service.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.service.ts @@ -3,7 +3,10 @@ import { Logger, OnApplicationBootstrap, OnModuleDestroy, + Optional, } from '@nestjs/common'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; import { BaseRepo } from '@docmost/db/repos/base/base.repo'; import { BaseRow } from '@docmost/db/types/entity.types'; import { @@ -51,13 +54,68 @@ export class BaseQueryCacheService private readonly configProvider: QueryCacheConfigProvider, private readonly baseRepo: BaseRepo, private readonly collectionLoader: CollectionLoader, + @Optional() private readonly redisService: RedisService | null = null, ) {} async onApplicationBootstrap(): Promise { - const { enabled } = this.configProvider.config; - this.logger.log( - `BaseQueryCacheService bootstrapped (enabled=${enabled}).`, - ); + const { enabled, warmTopN } = this.configProvider.config; + if (!enabled) return; + const redis = this.tryGetRedisClient(); + if (!redis) return; + try { + const ids = await redis.zrevrange( + 'base-query-cache:recent', + 0, + warmTopN - 1, + ); + for (const baseId of ids) { + try { + const base = await this.baseRepo.findById(baseId); + if (!base) continue; + await this.ensureLoaded(baseId, base.workspaceId); + } catch (err) { + this.logger.debug( + `warm-up skipped ${baseId}: ${(err as Error).message}`, + ); + } + } + this.logger.log(`Warmed ${ids.length} collections on boot`); + } catch (err) { + const error = err as Error; + this.logger.warn(`Warm-up failed: ${error.message}`); + if (error.stack) this.logger.warn(error.stack); + } + } + + private tryGetRedisClient(): Redis | null { + if (!this.redisService) return null; + try { + return this.redisService.getOrNil(); + } catch { + return null; + } + } + + private recordAccess(baseId: string): void { + if (!this.configProvider.config.enabled) return; + const redis = this.tryGetRedisClient(); + if (!redis) return; + const nowMs = Date.now(); + const maxKeep = this.configProvider.config.maxCollections * 10; + void (async () => { + try { + await redis.zadd('base-query-cache:recent', nowMs, baseId); + await redis.zremrangebyrank( + 'base-query-cache:recent', + 0, + -(maxKeep + 1), + ); + } catch (err) { + this.logger.debug( + `recordAccess failed for ${baseId}: ${(err as Error).message}`, + ); + } + })(); } async onModuleDestroy(): Promise { @@ -308,6 +366,7 @@ export class BaseQueryCacheService if (existing && existing.schemaVersion === freshVersion) { existing.lastAccessedAt = Date.now(); + this.recordAccess(baseId); return existing; } @@ -317,7 +376,11 @@ export class BaseQueryCacheService } const inFlight = this.inFlightLoads.get(baseId); - if (inFlight) return inFlight; + if (inFlight) { + const loaded = await inFlight; + this.recordAccess(baseId); + return loaded; + } const promise = (async () => { try { @@ -333,7 +396,9 @@ export class BaseQueryCacheService } })(); this.inFlightLoads.set(baseId, promise); - return promise; + const loaded = await promise; + this.recordAccess(baseId); + return loaded; } private evictLru(): void {