feat(server): propagate row mutations to duckdb cache via redis pubsub

This commit is contained in:
Philipinho
2026-04-19 22:00:37 +01:00
parent 7534b44e6e
commit df22efb290
5 changed files with 644 additions and 33 deletions
@@ -5,6 +5,9 @@ import { CamelCasePlugin } from 'kysely';
import { PostgresJSDialect } from 'kysely-postgres-js'; import { PostgresJSDialect } from 'kysely-postgres-js';
import * as postgres from 'postgres'; import * as postgres from 'postgres';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter';
import { RedisModule } from '@nestjs-labs/nestjs-ioredis';
import Redis from 'ioredis';
import { BaseRepo } from '@docmost/db/repos/base/base.repo'; import { BaseRepo } from '@docmost/db/repos/base/base.repo';
import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo'; import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo';
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo'; import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
@@ -13,11 +16,17 @@ import { KyselyDB } from '@docmost/db/types/kysely.types';
import { BaseQueryCacheService } from './base-query-cache.service'; import { BaseQueryCacheService } from './base-query-cache.service';
import { QueryCacheConfigProvider } from './query-cache.config'; import { QueryCacheConfigProvider } from './query-cache.config';
import { CollectionLoader } from './collection-loader'; import { CollectionLoader } from './collection-loader';
import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer';
import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber';
import { EnvironmentService } from '../../../integrations/environment/environment.service'; import { EnvironmentService } from '../../../integrations/environment/environment.service';
import { seedBase, deleteSeededBase } from './testing/seed-base'; import { seedBase, deleteSeededBase } from './testing/seed-base';
import { PropertySchema } from '../engine'; import { PropertySchema } from '../engine';
import { EventName } from '../../../common/events/event.contants';
import { BaseRowUpdatedEvent } from '../events/base-events';
import { ChangeEnvelope } from './query-cache.types';
const INTEGRATION_DB_URL = process.env.INTEGRATION_DB_URL; const INTEGRATION_DB_URL = process.env.INTEGRATION_DB_URL;
const REDIS_URL = process.env.REDIS_URL ?? 'redis://localhost:6379';
// Minimal EnvironmentService stand-in that only implements the methods used // Minimal EnvironmentService stand-in that only implements the methods used
// by query-cache and the repos we touch. // by query-cache and the repos we touch.
@@ -44,6 +53,9 @@ class FakeEnvService {
getBaseQueryCacheWarmTopN() { getBaseQueryCacheWarmTopN() {
return 0; return 0;
} }
getRedisUrl() {
return REDIS_URL;
}
} }
@Injectable() @Injectable()
@@ -65,63 +77,105 @@ function normalizePostgresUrl(url: string): string {
const describeIntegration = INTEGRATION_DB_URL ? describe : describe.skip; const describeIntegration = INTEGRATION_DB_URL ? describe : describe.skip;
async function isRedisReachable(): Promise<boolean> {
const probe = new Redis(REDIS_URL, {
lazyConnect: true,
maxRetriesPerRequest: 1,
retryStrategy: () => null,
});
try {
await probe.connect();
await probe.ping();
return true;
} catch {
return false;
} finally {
probe.disconnect();
}
}
describeIntegration('BaseQueryCacheService integration', () => { describeIntegration('BaseQueryCacheService integration', () => {
let moduleRef: TestingModule; let moduleRef: TestingModule;
let cache: BaseQueryCacheService; let cache: BaseQueryCacheService;
let baseRowRepo: BaseRowRepo; let baseRowRepo: BaseRowRepo;
let basePropertyRepo: BasePropertyRepo; let basePropertyRepo: BasePropertyRepo;
let dbHandle: DbHandle; let dbHandle: DbHandle;
let eventEmitter: EventEmitter2;
let seededBaseId: string | null = null; let seededBaseId: string | null = null;
let workspaceId: string; let workspaceId: string;
let spaceId: string; let spaceId: string;
let creatorUserId: string | null; let creatorUserId: string | null;
let redisReachable = false;
beforeAll(async () => { beforeAll(async () => {
process.env.DATABASE_URL = INTEGRATION_DB_URL; process.env.DATABASE_URL = INTEGRATION_DB_URL;
process.env.REDIS_URL = REDIS_URL;
process.env.BASE_QUERY_CACHE_ENABLED = 'true'; process.env.BASE_QUERY_CACHE_ENABLED = 'true';
process.env.BASE_QUERY_CACHE_MIN_ROWS = '100'; process.env.BASE_QUERY_CACHE_MIN_ROWS = '100';
moduleRef = await Test.createTestingModule({ redisReachable = await isRedisReachable();
imports: [
ConfigModule.forRoot({ isGlobal: true }), const imports: any[] = [
KyselyModule.forRoot({ ConfigModule.forRoot({ isGlobal: true }),
dialect: new PostgresJSDialect({ KyselyModule.forRoot({
postgres: (postgres as any)( dialect: new PostgresJSDialect({
normalizePostgresUrl(INTEGRATION_DB_URL!), postgres: (postgres as any)(
{ normalizePostgresUrl(INTEGRATION_DB_URL!),
max: 5, {
onnotice: () => {}, max: 5,
types: { onnotice: () => {},
bigint: { types: {
to: 20, bigint: {
from: [20, 1700], to: 20,
serialize: (value: number) => value.toString(), from: [20, 1700],
parse: (value: string) => Number.parseInt(value), serialize: (value: number) => value.toString(),
}, parse: (value: string) => Number.parseInt(value),
}, },
}, },
), },
}), ),
plugins: [new CamelCasePlugin()],
}), }),
], plugins: [new CamelCasePlugin()],
providers: [ }),
{ provide: EnvironmentService, useClass: FakeEnvService }, EventEmitterModule.forRoot(),
QueryCacheConfigProvider, ];
BaseRepo,
BasePropertyRepo, const providers: any[] = [
BaseRowRepo, { provide: EnvironmentService, useClass: FakeEnvService },
BaseViewRepo, QueryCacheConfigProvider,
CollectionLoader, BaseRepo,
BaseQueryCacheService, BasePropertyRepo,
DbHandle, BaseRowRepo,
], BaseViewRepo,
CollectionLoader,
BaseQueryCacheService,
DbHandle,
];
if (redisReachable) {
imports.push(
RedisModule.forRoot({
readyLog: false,
config: { host: '127.0.0.1', port: 6379 },
}),
);
providers.push(BaseQueryCacheWriteConsumer, BaseQueryCacheSubscriber);
}
moduleRef = await Test.createTestingModule({
imports,
providers,
}).compile(); }).compile();
if (redisReachable) {
await moduleRef.init();
}
cache = moduleRef.get(BaseQueryCacheService); cache = moduleRef.get(BaseQueryCacheService);
baseRowRepo = moduleRef.get(BaseRowRepo); baseRowRepo = moduleRef.get(BaseRowRepo);
basePropertyRepo = moduleRef.get(BasePropertyRepo); basePropertyRepo = moduleRef.get(BasePropertyRepo);
dbHandle = moduleRef.get(DbHandle); dbHandle = moduleRef.get(DbHandle);
eventEmitter = moduleRef.get(EventEmitter2);
const workspace = await dbHandle.db const workspace = await dbHandle.db
.selectFrom('workspaces') .selectFrom('workspaces')
@@ -219,4 +273,125 @@ describeIntegration('BaseQueryCacheService integration', () => {
}, },
60_000, 60_000,
); );
it(
'applyChange upsert reflects in subsequent list reads',
async () => {
const baseId = seededBaseId!;
const properties = await basePropertyRepo.findByBaseId(baseId);
const schema: PropertySchema = new Map(properties.map((p) => [p.id, p]));
const estimateProp = properties.find((p) => p.name === 'Estimate');
if (!estimateProp) throw new Error('Estimate property not found');
// Force the collection to load.
const firstPage = await cache.list(baseId, workspaceId, {
schema,
pagination: { limit: 1 } as any,
});
expect(firstPage.items.length).toBe(1);
const targetRowId = firstPage.items[0].id;
// Patch the row directly in Postgres and apply the envelope via the
// cache service's public API (pubsub-free — deterministic).
const nextEstimate = 424242;
const pgRow = await baseRowRepo.findById(targetRowId, { workspaceId });
if (!pgRow) throw new Error('Row not found');
const newCells = {
...(pgRow.cells as Record<string, unknown>),
[estimateProp.id]: nextEstimate,
};
await dbHandle.db
.updateTable('baseRows')
.set({ cells: newCells as any })
.where('id', '=', targetRowId)
.where('workspaceId', '=', workspaceId)
.execute();
const envelope: ChangeEnvelope = {
kind: 'row-upsert',
baseId,
row: { ...pgRow, cells: newCells } as unknown as Record<string, unknown>,
};
await cache.applyChange(envelope);
// Read back via DuckDB with a filter that should only match the
// freshly-patched value.
const page = await cache.list(baseId, workspaceId, {
schema,
filter: {
propertyId: estimateProp.id,
op: 'eq',
value: nextEstimate,
} as any,
pagination: { limit: 5 } as any,
});
const ids = page.items.map((r) => r.id);
expect(ids).toContain(targetRowId);
},
60_000,
);
it(
'pubsub round-trip: BASE_ROW_UPDATED event propagates to DuckDB',
async () => {
if (!redisReachable) {
console.warn('Skipping pubsub round-trip: Redis not reachable');
return;
}
const baseId = seededBaseId!;
const properties = await basePropertyRepo.findByBaseId(baseId);
const schema: PropertySchema = new Map(properties.map((p) => [p.id, p]));
const estimateProp = properties.find((p) => p.name === 'Estimate');
if (!estimateProp) throw new Error('Estimate property not found');
// Force the collection to load.
const firstPage = await cache.list(baseId, workspaceId, {
schema,
pagination: { limit: 1 } as any,
});
expect(firstPage.items.length).toBe(1);
const targetRowId = firstPage.items[0].id;
const nextEstimate = 999_001;
const pgRow = await baseRowRepo.findById(targetRowId, { workspaceId });
if (!pgRow) throw new Error('Row not found');
const newCells = {
...(pgRow.cells as Record<string, unknown>),
[estimateProp.id]: nextEstimate,
};
await dbHandle.db
.updateTable('baseRows')
.set({ cells: newCells as any })
.where('id', '=', targetRowId)
.where('workspaceId', '=', workspaceId)
.execute();
const event: BaseRowUpdatedEvent = {
baseId,
workspaceId,
actorId: null,
requestId: null,
rowId: targetRowId,
patch: { [estimateProp.id]: nextEstimate },
updatedCells: { [estimateProp.id]: nextEstimate },
};
eventEmitter.emit(EventName.BASE_ROW_UPDATED, event);
// Wait for Redis pubsub round-trip.
await new Promise((r) => setTimeout(r, 500));
const page = await cache.list(baseId, workspaceId, {
schema,
filter: {
propertyId: estimateProp.id,
op: 'eq',
value: nextEstimate,
} as any,
pagination: { limit: 5 } as any,
});
const ids = page.items.map((r) => r.id);
expect(ids).toContain(targetRowId);
},
60_000,
);
}); });
@@ -24,7 +24,12 @@ import {
import { QueryCacheConfigProvider } from './query-cache.config'; import { QueryCacheConfigProvider } from './query-cache.config';
import { CollectionLoader } from './collection-loader'; import { CollectionLoader } from './collection-loader';
import { buildDuckDbListQuery } from './duckdb-query-builder'; import { buildDuckDbListQuery } from './duckdb-query-builder';
import { ColumnSpec, LoadedCollection } from './query-cache.types'; import { BasePropertyType } from '../base.schemas';
import {
ChangeEnvelope,
ColumnSpec,
LoadedCollection,
} from './query-cache.types';
export type CacheListOpts = { export type CacheListOpts = {
filter?: FilterNode; filter?: FilterNode;
@@ -165,6 +170,117 @@ export class BaseQueryCacheService
this.collections.delete(baseId); this.collections.delete(baseId);
} }
/*
* Apply a change envelope received from Redis pub/sub to the local
* collection (if any). Rows that target bases not resident on this node
* are ignored — the next `list` call will load them fresh from Postgres.
* If any patch step throws (e.g. schema drift between this node and the
* publisher) we eagerly invalidate so the next `list` rebuilds cleanly
* rather than serving partial state.
*/
async applyChange(env: ChangeEnvelope): Promise<void> {
const collection = this.collections.get(env.baseId);
if (!collection) return;
try {
switch (env.kind) {
case 'schema-invalidate':
if (env.schemaVersion > collection.schemaVersion) {
await this.invalidate(env.baseId);
}
return;
case 'row-upsert':
await this.upsertRow(collection, env.row);
return;
case 'row-delete':
await this.deleteRow(collection, env.rowId);
return;
case 'rows-delete':
for (const id of env.rowIds) await this.deleteRow(collection, id);
return;
case 'row-reorder':
await this.updatePosition(collection, env.rowId, env.position);
return;
}
} catch (err) {
const error = err as Error;
this.logger.warn(
`applyChange failed for ${env.baseId}; invalidating: ${error.message}`,
);
if (error.stack) this.logger.warn(error.stack);
await this.invalidate(env.baseId);
}
}
private async upsertRow(
collection: LoadedCollection,
row: Record<string, unknown>,
): Promise<void> {
const specs = collection.columns;
const columnList = specs.map((s) => quoteIdent(s.column)).join(', ');
const placeholders = specs.map(() => '?').join(', ');
const sql = `INSERT OR REPLACE INTO rows (${columnList}) VALUES (${placeholders})`;
const prepared = await collection.connection.prepare(sql);
for (let i = 0; i < specs.length; i++) {
const spec = specs[i];
const oneBased = i + 1;
const raw = readFromRowEvent(row, spec);
if (raw == null) {
prepared.bindNull(oneBased);
continue;
}
switch (spec.ddlType) {
case 'VARCHAR':
prepared.bindVarchar(oneBased, String(raw));
break;
case 'DOUBLE': {
const n = Number(raw);
if (Number.isNaN(n)) prepared.bindNull(oneBased);
else prepared.bindDouble(oneBased, n);
break;
}
case 'BOOLEAN':
prepared.bindBoolean(oneBased, Boolean(raw));
break;
case 'TIMESTAMPTZ': {
const d = raw instanceof Date ? raw : new Date(String(raw));
if (Number.isNaN(d.getTime())) prepared.bindNull(oneBased);
else prepared.bindVarchar(oneBased, d.toISOString());
break;
}
case 'JSON':
prepared.bindVarchar(oneBased, JSON.stringify(raw));
break;
}
}
await prepared.run();
}
private async deleteRow(
collection: LoadedCollection,
rowId: string,
): Promise<void> {
const prepared = await collection.connection.prepare(
'DELETE FROM rows WHERE id = ?',
);
prepared.bindVarchar(1, rowId);
await prepared.run();
}
private async updatePosition(
collection: LoadedCollection,
rowId: string,
position: string,
): Promise<void> {
const prepared = await collection.connection.prepare(
'UPDATE rows SET position = ? WHERE id = ?',
);
prepared.bindVarchar(1, position);
prepared.bindVarchar(2, rowId);
await prepared.run();
}
private async ensureLoaded( private async ensureLoaded(
baseId: string, baseId: string,
workspaceId: string, workspaceId: string,
@@ -304,3 +420,56 @@ function toDate(value: unknown): Date {
if (value instanceof Date) return value; if (value instanceof Date) return value;
return new Date(String(value)); return new Date(String(value));
} }
// System property type → system column on base_rows (mirrors the map in
// collection-loader.ts). Kept local to avoid a circular import.
const SYSTEM_PROPERTY_COLUMN_LOOKUP: Record<string, string> = {
[BasePropertyType.CREATED_AT]: 'createdAt',
[BasePropertyType.LAST_EDITED_AT]: 'updatedAt',
[BasePropertyType.LAST_EDITED_BY]: 'lastUpdatedById',
};
// Mirror of collection-loader's `readFromRow`, but keyed off a generic event
// payload (which may be camelCase JSON because it came over EventEmitter /
// Redis rather than straight from Kysely — both shapes round-trip through
// here). The function tolerates both the wire shape and the repo shape.
function readFromRowEvent(
row: Record<string, unknown>,
spec: ColumnSpec,
): unknown {
switch (spec.column) {
case 'id':
return row.id;
case 'base_id':
return row.baseId ?? row.base_id;
case 'workspace_id':
return row.workspaceId ?? row.workspace_id;
case 'creator_id':
return row.creatorId ?? row.creator_id;
case 'position':
return row.position;
case 'created_at':
return row.createdAt ?? row.created_at;
case 'updated_at':
return row.updatedAt ?? row.updated_at;
case 'last_updated_by_id':
return row.lastUpdatedById ?? row.last_updated_by_id;
case 'deleted_at':
return null;
case 'search_text':
return '';
}
const prop = spec.property;
if (!prop) return null;
const sysColumn = SYSTEM_PROPERTY_COLUMN_LOOKUP[prop.type];
if (sysColumn) return row[sysColumn] ?? null;
const cells = (row.cells as Record<string, unknown> | null) ?? {};
return cells[prop.id] ?? null;
}
function quoteIdent(name: string): string {
return `"${name.replace(/"/g, '""')}"`;
}
@@ -0,0 +1,110 @@
import {
Injectable,
Logger,
OnApplicationBootstrap,
OnModuleDestroy,
} from '@nestjs/common';
import Redis from 'ioredis';
import { EnvironmentService } from '../../../integrations/environment/environment.service';
import {
createRetryStrategy,
parseRedisUrl,
} from '../../../common/helpers/utils';
import { QueryCacheConfigProvider } from './query-cache.config';
import { BaseQueryCacheService } from './base-query-cache.service';
import { ChangeEnvelope } from './query-cache.types';
const CHANNEL_PATTERN = 'base-query-cache:changes:*';
/*
* Dedicated ioredis subscriber that forwards change envelopes to the local
* BaseQueryCacheService. A separate connection is required because ioredis
* puts subscribing clients into subscriber-only mode and the shared client
* from RedisService is used for normal commands elsewhere in the app.
* When the query-cache is disabled we do not open a Redis connection at all.
*/
@Injectable()
export class BaseQueryCacheSubscriber
implements OnApplicationBootstrap, OnModuleDestroy
{
private readonly logger = new Logger(BaseQueryCacheSubscriber.name);
private client: Redis | null = null;
constructor(
private readonly configProvider: QueryCacheConfigProvider,
private readonly env: EnvironmentService,
private readonly cacheService: BaseQueryCacheService,
) {}
async onApplicationBootstrap(): Promise<void> {
if (!this.configProvider.config.enabled) return;
const redisUrl = this.env.getRedisUrl();
const { family } = parseRedisUrl(redisUrl);
this.client = new Redis(redisUrl, {
family,
retryStrategy: createRetryStrategy(),
});
this.client.on('error', (err) => {
this.logger.warn(`Subscriber client error: ${err.message}`);
});
this.client.on('pmessage', (_pattern, channel, message) => {
this.handleMessage(channel, message).catch((err) => {
const error = err as Error;
this.logger.warn(
`Unhandled error applying change from ${channel}: ${error.message}`,
);
});
});
try {
await this.client.psubscribe(CHANNEL_PATTERN);
this.logger.log(`Subscribed to ${CHANNEL_PATTERN}`);
} catch (err) {
const error = err as Error;
this.logger.warn(`Failed to psubscribe: ${error.message}`);
}
}
async onModuleDestroy(): Promise<void> {
if (!this.client) return;
try {
await this.client.quit();
} catch (err) {
const error = err as Error;
this.logger.warn(
`Failed to close subscriber client cleanly: ${error.message}`,
);
}
this.client = null;
}
private async handleMessage(
channel: string,
message: string,
): Promise<void> {
let envelope: ChangeEnvelope;
try {
envelope = JSON.parse(message) as ChangeEnvelope;
} catch (err) {
const error = err as Error;
this.logger.warn(
`Dropping malformed cache-change message on ${channel}: ${error.message}`,
);
return;
}
try {
await this.cacheService.applyChange(envelope);
} catch (err) {
const error = err as Error;
this.logger.warn(
`applyChange failed for ${envelope.baseId}: ${error.message}`,
);
if (error.stack) this.logger.warn(error.stack);
}
}
}
@@ -0,0 +1,153 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
import type { Redis } from 'ioredis';
import { EventName } from '../../../common/events/event.contants';
import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo';
import {
BasePropertyCreatedEvent,
BasePropertyDeletedEvent,
BasePropertyUpdatedEvent,
BaseRowCreatedEvent,
BaseRowDeletedEvent,
BaseRowReorderedEvent,
BaseRowUpdatedEvent,
BaseRowsDeletedEvent,
BaseSchemaBumpedEvent,
} from '../events/base-events';
import { QueryCacheConfigProvider } from './query-cache.config';
import { ChangeEnvelope } from './query-cache.types';
/*
* Bridges in-process base domain events onto a Redis pub/sub channel so every
* node running the query-cache can keep its resident DuckDB collections in
* sync. Each base gets its own channel (`base-query-cache:changes:${baseId}`)
* to keep pattern matching cheap. When the feature flag is off this class
* registers as a no-op so we pay zero overhead.
*/
@Injectable()
export class BaseQueryCacheWriteConsumer {
private readonly logger = new Logger(BaseQueryCacheWriteConsumer.name);
private readonly redis: Redis;
constructor(
private readonly redisService: RedisService,
private readonly configProvider: QueryCacheConfigProvider,
private readonly baseRowRepo: BaseRowRepo,
) {
this.redis = this.redisService.getOrThrow();
}
@OnEvent(EventName.BASE_ROW_CREATED)
async onRowCreated(e: BaseRowCreatedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
await this.publish(e.baseId, {
kind: 'row-upsert',
baseId: e.baseId,
row: e.row as unknown as Record<string, unknown>,
});
}
@OnEvent(EventName.BASE_ROW_UPDATED)
async onRowUpdated(e: BaseRowUpdatedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
const row = await this.baseRowRepo.findById(e.rowId, {
workspaceId: e.workspaceId,
});
if (!row) return;
await this.publish(e.baseId, {
kind: 'row-upsert',
baseId: e.baseId,
row: row as unknown as Record<string, unknown>,
});
}
@OnEvent(EventName.BASE_ROW_DELETED)
async onRowDeleted(e: BaseRowDeletedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
await this.publish(e.baseId, {
kind: 'row-delete',
baseId: e.baseId,
rowId: e.rowId,
});
}
@OnEvent(EventName.BASE_ROWS_DELETED)
async onRowsDeleted(e: BaseRowsDeletedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
await this.publish(e.baseId, {
kind: 'rows-delete',
baseId: e.baseId,
rowIds: e.rowIds,
});
}
@OnEvent(EventName.BASE_ROW_REORDERED)
async onRowReordered(e: BaseRowReorderedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
await this.publish(e.baseId, {
kind: 'row-reorder',
baseId: e.baseId,
rowId: e.rowId,
position: e.position,
});
}
@OnEvent(EventName.BASE_SCHEMA_BUMPED)
async onSchemaBumped(e: BaseSchemaBumpedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
await this.publish(e.baseId, {
kind: 'schema-invalidate',
baseId: e.baseId,
schemaVersion: e.schemaVersion,
});
}
@OnEvent(EventName.BASE_PROPERTY_UPDATED)
async onPropertyUpdated(e: BasePropertyUpdatedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
await this.publish(e.baseId, {
kind: 'schema-invalidate',
baseId: e.baseId,
schemaVersion: e.schemaVersion,
});
}
@OnEvent(EventName.BASE_PROPERTY_CREATED)
async onPropertyCreated(e: BasePropertyCreatedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
// Property creation doesn't carry a schemaVersion in its payload; use 0
// so applyChange always treats it as stale relative to the resident
// collection (which has schemaVersion >= 1) and triggers invalidation.
await this.publish(e.baseId, {
kind: 'schema-invalidate',
baseId: e.baseId,
schemaVersion: Number.MAX_SAFE_INTEGER,
});
}
@OnEvent(EventName.BASE_PROPERTY_DELETED)
async onPropertyDeleted(e: BasePropertyDeletedEvent): Promise<void> {
if (!this.configProvider.config.enabled) return;
await this.publish(e.baseId, {
kind: 'schema-invalidate',
baseId: e.baseId,
schemaVersion: Number.MAX_SAFE_INTEGER,
});
}
private async publish(
baseId: string,
envelope: ChangeEnvelope,
): Promise<void> {
const channel = `base-query-cache:changes:${baseId}`;
try {
await this.redis.publish(channel, JSON.stringify(envelope));
} catch (err) {
const error = err as Error;
this.logger.warn(
`Failed to publish cache change for ${baseId}: ${error.message}`,
);
}
}
}
@@ -3,6 +3,8 @@ import { QueryCacheConfigProvider } from './query-cache.config';
import { BaseQueryCacheService } from './base-query-cache.service'; import { BaseQueryCacheService } from './base-query-cache.service';
import { BaseQueryRouter } from './base-query-router'; import { BaseQueryRouter } from './base-query-router';
import { CollectionLoader } from './collection-loader'; import { CollectionLoader } from './collection-loader';
import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer';
import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber';
@Module({ @Module({
providers: [ providers: [
@@ -10,6 +12,8 @@ import { CollectionLoader } from './collection-loader';
BaseQueryCacheService, BaseQueryCacheService,
BaseQueryRouter, BaseQueryRouter,
CollectionLoader, CollectionLoader,
BaseQueryCacheWriteConsumer,
BaseQueryCacheSubscriber,
], ],
exports: [BaseQueryCacheService, BaseQueryRouter, QueryCacheConfigProvider], exports: [BaseQueryCacheService, BaseQueryRouter, QueryCacheConfigProvider],
}) })