feat(server): warm duckdb collections on boot from redis recent-access set

This commit is contained in:
Philipinho
2026-04-19 22:16:20 +01:00
parent c9adf84260
commit 4636af3870
2 changed files with 268 additions and 6 deletions
@@ -569,3 +569,200 @@ describeIntegration('BaseQueryCacheService integration', () => {
60_000,
);
});
describeIntegration('BaseQueryCacheService warm-up on boot', () => {
@Injectable()
class WarmUpEnvService {
getDatabaseURL() {
return INTEGRATION_DB_URL!;
}
getDatabaseMaxPool() {
return 5;
}
getNodeEnv() {
return 'test';
}
getBaseQueryCacheEnabled() {
return true;
}
getBaseQueryCacheMinRows() {
return 100;
}
getBaseQueryCacheMaxCollections() {
return 5;
}
getBaseQueryCacheWarmTopN() {
return 5;
}
getRedisUrl() {
return REDIS_URL;
}
}
async function buildModule(): Promise<TestingModule> {
const moduleRef = await Test.createTestingModule({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
KyselyModule.forRoot({
dialect: new PostgresJSDialect({
postgres: (postgres as any)(
normalizePostgresUrl(INTEGRATION_DB_URL!),
{
max: 5,
onnotice: () => {},
types: {
bigint: {
to: 20,
from: [20, 1700],
serialize: (value: number) => value.toString(),
parse: (value: string) => Number.parseInt(value),
},
},
},
),
}),
plugins: [new CamelCasePlugin()],
}),
EventEmitterModule.forRoot(),
RedisModule.forRoot({
readyLog: false,
config: { host: '127.0.0.1', port: 6379 },
}),
],
providers: [
{ provide: EnvironmentService, useClass: WarmUpEnvService },
QueryCacheConfigProvider,
BaseRepo,
BasePropertyRepo,
BaseRowRepo,
BaseViewRepo,
CollectionLoader,
BaseQueryCacheService,
DbHandle,
],
}).compile();
await moduleRef.init();
return moduleRef;
}
let firstModule: TestingModule | null = null;
let secondModule: TestingModule | null = null;
let seededBaseId: string | null = null;
let redisReachable = false;
let probeRedis: Redis | null = null;
beforeAll(async () => {
process.env.DATABASE_URL = INTEGRATION_DB_URL;
process.env.REDIS_URL = REDIS_URL;
redisReachable = await isRedisReachable();
if (!redisReachable) return;
probeRedis = new Redis(REDIS_URL, { maxRetriesPerRequest: 1 });
// Scrub any stale state from prior runs so zrevrange returns only the
// ids this test records.
await probeRedis.del('base-query-cache:recent');
firstModule = await buildModule();
const dbHandle = firstModule.get(DbHandle);
const workspace = await dbHandle.db
.selectFrom('workspaces')
.select(['id'])
.limit(1)
.executeTakeFirstOrThrow();
const space = await dbHandle.db
.selectFrom('spaces')
.select(['id'])
.where('workspaceId', '=', workspace.id)
.limit(1)
.executeTakeFirstOrThrow();
const user = await dbHandle.db
.selectFrom('users')
.select('id')
.limit(1)
.executeTakeFirst();
const seed = await seedBase({
db: dbHandle.db as any,
workspaceId: workspace.id,
spaceId: space.id,
creatorUserId: user?.id ?? null,
rows: 200,
name: `cache-warmup-${Date.now()}`,
});
seededBaseId = seed.baseId;
}, 180_000);
afterAll(async () => {
if (firstModule && seededBaseId) {
const dbHandle = firstModule.get(DbHandle);
await deleteSeededBase(dbHandle.db as any, seededBaseId);
}
if (firstModule) await firstModule.close();
if (secondModule) await secondModule.close();
if (probeRedis) {
try {
await probeRedis.del('base-query-cache:recent');
} catch {}
probeRedis.disconnect();
}
}, 60_000);
it(
'records access in redis and warms the collection on boot',
async () => {
if (!redisReachable) {
console.warn('Skipping warm-up test: Redis not reachable');
return;
}
const baseId = seededBaseId!;
const cache = firstModule!.get(BaseQueryCacheService);
const basePropertyRepo = firstModule!.get(BasePropertyRepo);
const dbHandle = firstModule!.get(DbHandle);
const workspace = await dbHandle.db
.selectFrom('workspaces')
.select(['id'])
.limit(1)
.executeTakeFirstOrThrow();
const workspaceId = workspace.id;
const properties = await basePropertyRepo.findByBaseId(baseId);
const schema: PropertySchema = new Map(properties.map((p) => [p.id, p]));
await cache.list(baseId, workspaceId, {
schema,
pagination: { limit: 10 } as any,
});
// recordAccess is fire-and-forget; give the ZADD time to round-trip.
await new Promise((r) => setTimeout(r, 200));
const recent = await probeRedis!.zrevrange(
'base-query-cache:recent',
0,
0,
);
expect(recent).toEqual([baseId]);
// Simulate a fresh boot: close the current service, build a new module,
// and assert warm-up populates the collection without calling list().
await firstModule!.close();
firstModule = null;
secondModule = await buildModule();
const cache2 = secondModule.get(BaseQueryCacheService);
// onApplicationBootstrap is called by moduleRef.init() above; but to be
// explicit about the warm-up path we assert residency directly.
expect(cache2.isResident(baseId)).toBe(true);
const page = await cache2.list(baseId, workspaceId, {
schema,
pagination: { limit: 10 } as any,
});
expect(page.items.length).toBe(10);
},
120_000,
);
});
@@ -3,7 +3,10 @@ import {
Logger,
OnApplicationBootstrap,
OnModuleDestroy,
Optional,
} from '@nestjs/common';
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
import type { Redis } from 'ioredis';
import { BaseRepo } from '@docmost/db/repos/base/base.repo';
import { BaseRow } from '@docmost/db/types/entity.types';
import {
@@ -51,13 +54,68 @@ export class BaseQueryCacheService
private readonly configProvider: QueryCacheConfigProvider,
private readonly baseRepo: BaseRepo,
private readonly collectionLoader: CollectionLoader,
@Optional() private readonly redisService: RedisService | null = null,
) {}
async onApplicationBootstrap(): Promise<void> {
const { enabled } = this.configProvider.config;
this.logger.log(
`BaseQueryCacheService bootstrapped (enabled=${enabled}).`,
);
const { enabled, warmTopN } = this.configProvider.config;
if (!enabled) return;
const redis = this.tryGetRedisClient();
if (!redis) return;
try {
const ids = await redis.zrevrange(
'base-query-cache:recent',
0,
warmTopN - 1,
);
for (const baseId of ids) {
try {
const base = await this.baseRepo.findById(baseId);
if (!base) continue;
await this.ensureLoaded(baseId, base.workspaceId);
} catch (err) {
this.logger.debug(
`warm-up skipped ${baseId}: ${(err as Error).message}`,
);
}
}
this.logger.log(`Warmed ${ids.length} collections on boot`);
} catch (err) {
const error = err as Error;
this.logger.warn(`Warm-up failed: ${error.message}`);
if (error.stack) this.logger.warn(error.stack);
}
}
private tryGetRedisClient(): Redis | null {
if (!this.redisService) return null;
try {
return this.redisService.getOrNil();
} catch {
return null;
}
}
private recordAccess(baseId: string): void {
if (!this.configProvider.config.enabled) return;
const redis = this.tryGetRedisClient();
if (!redis) return;
const nowMs = Date.now();
const maxKeep = this.configProvider.config.maxCollections * 10;
void (async () => {
try {
await redis.zadd('base-query-cache:recent', nowMs, baseId);
await redis.zremrangebyrank(
'base-query-cache:recent',
0,
-(maxKeep + 1),
);
} catch (err) {
this.logger.debug(
`recordAccess failed for ${baseId}: ${(err as Error).message}`,
);
}
})();
}
async onModuleDestroy(): Promise<void> {
@@ -308,6 +366,7 @@ export class BaseQueryCacheService
if (existing && existing.schemaVersion === freshVersion) {
existing.lastAccessedAt = Date.now();
this.recordAccess(baseId);
return existing;
}
@@ -317,7 +376,11 @@ export class BaseQueryCacheService
}
const inFlight = this.inFlightLoads.get(baseId);
if (inFlight) return inFlight;
if (inFlight) {
const loaded = await inFlight;
this.recordAccess(baseId);
return loaded;
}
const promise = (async () => {
try {
@@ -333,7 +396,9 @@ export class BaseQueryCacheService
}
})();
this.inFlightLoads.set(baseId, promise);
return promise;
const loaded = await promise;
this.recordAccess(baseId);
return loaded;
}
private evictLru(): void {