From df22efb290ab4be83465ae9de6a20a199db99aca Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Sun, 19 Apr 2026 22:00:37 +0100 Subject: [PATCH] feat(server): propagate row mutations to duckdb cache via redis pubsub --- .../base-query-cache.integration.spec.ts | 239 +++++++++++++++--- .../query-cache/base-query-cache.service.ts | 171 ++++++++++++- .../base-query-cache.subscriber.ts | 110 ++++++++ .../base-query-cache.write-consumer.ts | 153 +++++++++++ .../base/query-cache/query-cache.module.ts | 4 + 5 files changed, 644 insertions(+), 33 deletions(-) create mode 100644 apps/server/src/core/base/query-cache/base-query-cache.subscriber.ts create mode 100644 apps/server/src/core/base/query-cache/base-query-cache.write-consumer.ts diff --git a/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts b/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts index 67bafc4d..b1943cd2 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.integration.spec.ts @@ -5,6 +5,9 @@ import { CamelCasePlugin } from 'kysely'; import { PostgresJSDialect } from 'kysely-postgres-js'; import * as postgres from 'postgres'; import { Injectable } from '@nestjs/common'; +import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter'; +import { RedisModule } from '@nestjs-labs/nestjs-ioredis'; +import Redis from 'ioredis'; import { BaseRepo } from '@docmost/db/repos/base/base.repo'; import { BasePropertyRepo } from '@docmost/db/repos/base/base-property.repo'; import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo'; @@ -13,11 +16,17 @@ import { KyselyDB } from '@docmost/db/types/kysely.types'; import { BaseQueryCacheService } from './base-query-cache.service'; import { QueryCacheConfigProvider } from './query-cache.config'; import { CollectionLoader } from './collection-loader'; +import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer'; +import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber'; import { EnvironmentService } from '../../../integrations/environment/environment.service'; import { seedBase, deleteSeededBase } from './testing/seed-base'; import { PropertySchema } from '../engine'; +import { EventName } from '../../../common/events/event.contants'; +import { BaseRowUpdatedEvent } from '../events/base-events'; +import { ChangeEnvelope } from './query-cache.types'; const INTEGRATION_DB_URL = process.env.INTEGRATION_DB_URL; +const REDIS_URL = process.env.REDIS_URL ?? 'redis://localhost:6379'; // Minimal EnvironmentService stand-in that only implements the methods used // by query-cache and the repos we touch. @@ -44,6 +53,9 @@ class FakeEnvService { getBaseQueryCacheWarmTopN() { return 0; } + getRedisUrl() { + return REDIS_URL; + } } @Injectable() @@ -65,63 +77,105 @@ function normalizePostgresUrl(url: string): string { const describeIntegration = INTEGRATION_DB_URL ? describe : describe.skip; +async function isRedisReachable(): Promise { + const probe = new Redis(REDIS_URL, { + lazyConnect: true, + maxRetriesPerRequest: 1, + retryStrategy: () => null, + }); + try { + await probe.connect(); + await probe.ping(); + return true; + } catch { + return false; + } finally { + probe.disconnect(); + } +} + describeIntegration('BaseQueryCacheService integration', () => { let moduleRef: TestingModule; let cache: BaseQueryCacheService; let baseRowRepo: BaseRowRepo; let basePropertyRepo: BasePropertyRepo; let dbHandle: DbHandle; + let eventEmitter: EventEmitter2; let seededBaseId: string | null = null; let workspaceId: string; let spaceId: string; let creatorUserId: string | null; + let redisReachable = false; beforeAll(async () => { process.env.DATABASE_URL = INTEGRATION_DB_URL; + process.env.REDIS_URL = REDIS_URL; process.env.BASE_QUERY_CACHE_ENABLED = 'true'; process.env.BASE_QUERY_CACHE_MIN_ROWS = '100'; - moduleRef = await Test.createTestingModule({ - imports: [ - ConfigModule.forRoot({ isGlobal: true }), - KyselyModule.forRoot({ - dialect: new PostgresJSDialect({ - postgres: (postgres as any)( - normalizePostgresUrl(INTEGRATION_DB_URL!), - { - max: 5, - onnotice: () => {}, - types: { - bigint: { - to: 20, - from: [20, 1700], - serialize: (value: number) => value.toString(), - parse: (value: string) => Number.parseInt(value), - }, + redisReachable = await isRedisReachable(); + + const imports: any[] = [ + ConfigModule.forRoot({ isGlobal: true }), + KyselyModule.forRoot({ + dialect: new PostgresJSDialect({ + postgres: (postgres as any)( + normalizePostgresUrl(INTEGRATION_DB_URL!), + { + max: 5, + onnotice: () => {}, + types: { + bigint: { + to: 20, + from: [20, 1700], + serialize: (value: number) => value.toString(), + parse: (value: string) => Number.parseInt(value), }, }, - ), - }), - plugins: [new CamelCasePlugin()], + }, + ), }), - ], - providers: [ - { provide: EnvironmentService, useClass: FakeEnvService }, - QueryCacheConfigProvider, - BaseRepo, - BasePropertyRepo, - BaseRowRepo, - BaseViewRepo, - CollectionLoader, - BaseQueryCacheService, - DbHandle, - ], + plugins: [new CamelCasePlugin()], + }), + EventEmitterModule.forRoot(), + ]; + + const providers: any[] = [ + { provide: EnvironmentService, useClass: FakeEnvService }, + QueryCacheConfigProvider, + BaseRepo, + BasePropertyRepo, + BaseRowRepo, + BaseViewRepo, + CollectionLoader, + BaseQueryCacheService, + DbHandle, + ]; + + if (redisReachable) { + imports.push( + RedisModule.forRoot({ + readyLog: false, + config: { host: '127.0.0.1', port: 6379 }, + }), + ); + providers.push(BaseQueryCacheWriteConsumer, BaseQueryCacheSubscriber); + } + + moduleRef = await Test.createTestingModule({ + imports, + providers, }).compile(); + if (redisReachable) { + await moduleRef.init(); + } + cache = moduleRef.get(BaseQueryCacheService); baseRowRepo = moduleRef.get(BaseRowRepo); basePropertyRepo = moduleRef.get(BasePropertyRepo); dbHandle = moduleRef.get(DbHandle); + eventEmitter = moduleRef.get(EventEmitter2); const workspace = await dbHandle.db .selectFrom('workspaces') @@ -219,4 +273,125 @@ describeIntegration('BaseQueryCacheService integration', () => { }, 60_000, ); + + it( + 'applyChange upsert reflects in subsequent list reads', + async () => { + const baseId = seededBaseId!; + const properties = await basePropertyRepo.findByBaseId(baseId); + const schema: PropertySchema = new Map(properties.map((p) => [p.id, p])); + const estimateProp = properties.find((p) => p.name === 'Estimate'); + if (!estimateProp) throw new Error('Estimate property not found'); + + // Force the collection to load. + const firstPage = await cache.list(baseId, workspaceId, { + schema, + pagination: { limit: 1 } as any, + }); + expect(firstPage.items.length).toBe(1); + const targetRowId = firstPage.items[0].id; + + // Patch the row directly in Postgres and apply the envelope via the + // cache service's public API (pubsub-free — deterministic). + const nextEstimate = 424242; + const pgRow = await baseRowRepo.findById(targetRowId, { workspaceId }); + if (!pgRow) throw new Error('Row not found'); + const newCells = { + ...(pgRow.cells as Record), + [estimateProp.id]: nextEstimate, + }; + await dbHandle.db + .updateTable('baseRows') + .set({ cells: newCells as any }) + .where('id', '=', targetRowId) + .where('workspaceId', '=', workspaceId) + .execute(); + + const envelope: ChangeEnvelope = { + kind: 'row-upsert', + baseId, + row: { ...pgRow, cells: newCells } as unknown as Record, + }; + await cache.applyChange(envelope); + + // Read back via DuckDB with a filter that should only match the + // freshly-patched value. + const page = await cache.list(baseId, workspaceId, { + schema, + filter: { + propertyId: estimateProp.id, + op: 'eq', + value: nextEstimate, + } as any, + pagination: { limit: 5 } as any, + }); + const ids = page.items.map((r) => r.id); + expect(ids).toContain(targetRowId); + }, + 60_000, + ); + + it( + 'pubsub round-trip: BASE_ROW_UPDATED event propagates to DuckDB', + async () => { + if (!redisReachable) { + console.warn('Skipping pubsub round-trip: Redis not reachable'); + return; + } + const baseId = seededBaseId!; + const properties = await basePropertyRepo.findByBaseId(baseId); + const schema: PropertySchema = new Map(properties.map((p) => [p.id, p])); + const estimateProp = properties.find((p) => p.name === 'Estimate'); + if (!estimateProp) throw new Error('Estimate property not found'); + + // Force the collection to load. + const firstPage = await cache.list(baseId, workspaceId, { + schema, + pagination: { limit: 1 } as any, + }); + expect(firstPage.items.length).toBe(1); + const targetRowId = firstPage.items[0].id; + + const nextEstimate = 999_001; + const pgRow = await baseRowRepo.findById(targetRowId, { workspaceId }); + if (!pgRow) throw new Error('Row not found'); + const newCells = { + ...(pgRow.cells as Record), + [estimateProp.id]: nextEstimate, + }; + await dbHandle.db + .updateTable('baseRows') + .set({ cells: newCells as any }) + .where('id', '=', targetRowId) + .where('workspaceId', '=', workspaceId) + .execute(); + + const event: BaseRowUpdatedEvent = { + baseId, + workspaceId, + actorId: null, + requestId: null, + rowId: targetRowId, + patch: { [estimateProp.id]: nextEstimate }, + updatedCells: { [estimateProp.id]: nextEstimate }, + }; + eventEmitter.emit(EventName.BASE_ROW_UPDATED, event); + + // Wait for Redis pubsub round-trip. + await new Promise((r) => setTimeout(r, 500)); + + const page = await cache.list(baseId, workspaceId, { + schema, + filter: { + propertyId: estimateProp.id, + op: 'eq', + value: nextEstimate, + } as any, + pagination: { limit: 5 } as any, + }); + const ids = page.items.map((r) => r.id); + expect(ids).toContain(targetRowId); + }, + 60_000, + ); }); diff --git a/apps/server/src/core/base/query-cache/base-query-cache.service.ts b/apps/server/src/core/base/query-cache/base-query-cache.service.ts index c3eddfed..575159ba 100644 --- a/apps/server/src/core/base/query-cache/base-query-cache.service.ts +++ b/apps/server/src/core/base/query-cache/base-query-cache.service.ts @@ -24,7 +24,12 @@ import { import { QueryCacheConfigProvider } from './query-cache.config'; import { CollectionLoader } from './collection-loader'; import { buildDuckDbListQuery } from './duckdb-query-builder'; -import { ColumnSpec, LoadedCollection } from './query-cache.types'; +import { BasePropertyType } from '../base.schemas'; +import { + ChangeEnvelope, + ColumnSpec, + LoadedCollection, +} from './query-cache.types'; export type CacheListOpts = { filter?: FilterNode; @@ -165,6 +170,117 @@ export class BaseQueryCacheService this.collections.delete(baseId); } + /* + * Apply a change envelope received from Redis pub/sub to the local + * collection (if any). Rows that target bases not resident on this node + * are ignored — the next `list` call will load them fresh from Postgres. + * If any patch step throws (e.g. schema drift between this node and the + * publisher) we eagerly invalidate so the next `list` rebuilds cleanly + * rather than serving partial state. + */ + async applyChange(env: ChangeEnvelope): Promise { + const collection = this.collections.get(env.baseId); + if (!collection) return; + + try { + switch (env.kind) { + case 'schema-invalidate': + if (env.schemaVersion > collection.schemaVersion) { + await this.invalidate(env.baseId); + } + return; + case 'row-upsert': + await this.upsertRow(collection, env.row); + return; + case 'row-delete': + await this.deleteRow(collection, env.rowId); + return; + case 'rows-delete': + for (const id of env.rowIds) await this.deleteRow(collection, id); + return; + case 'row-reorder': + await this.updatePosition(collection, env.rowId, env.position); + return; + } + } catch (err) { + const error = err as Error; + this.logger.warn( + `applyChange failed for ${env.baseId}; invalidating: ${error.message}`, + ); + if (error.stack) this.logger.warn(error.stack); + await this.invalidate(env.baseId); + } + } + + private async upsertRow( + collection: LoadedCollection, + row: Record, + ): Promise { + const specs = collection.columns; + const columnList = specs.map((s) => quoteIdent(s.column)).join(', '); + const placeholders = specs.map(() => '?').join(', '); + const sql = `INSERT OR REPLACE INTO rows (${columnList}) VALUES (${placeholders})`; + + const prepared = await collection.connection.prepare(sql); + for (let i = 0; i < specs.length; i++) { + const spec = specs[i]; + const oneBased = i + 1; + const raw = readFromRowEvent(row, spec); + if (raw == null) { + prepared.bindNull(oneBased); + continue; + } + switch (spec.ddlType) { + case 'VARCHAR': + prepared.bindVarchar(oneBased, String(raw)); + break; + case 'DOUBLE': { + const n = Number(raw); + if (Number.isNaN(n)) prepared.bindNull(oneBased); + else prepared.bindDouble(oneBased, n); + break; + } + case 'BOOLEAN': + prepared.bindBoolean(oneBased, Boolean(raw)); + break; + case 'TIMESTAMPTZ': { + const d = raw instanceof Date ? raw : new Date(String(raw)); + if (Number.isNaN(d.getTime())) prepared.bindNull(oneBased); + else prepared.bindVarchar(oneBased, d.toISOString()); + break; + } + case 'JSON': + prepared.bindVarchar(oneBased, JSON.stringify(raw)); + break; + } + } + await prepared.run(); + } + + private async deleteRow( + collection: LoadedCollection, + rowId: string, + ): Promise { + const prepared = await collection.connection.prepare( + 'DELETE FROM rows WHERE id = ?', + ); + prepared.bindVarchar(1, rowId); + await prepared.run(); + } + + private async updatePosition( + collection: LoadedCollection, + rowId: string, + position: string, + ): Promise { + const prepared = await collection.connection.prepare( + 'UPDATE rows SET position = ? WHERE id = ?', + ); + prepared.bindVarchar(1, position); + prepared.bindVarchar(2, rowId); + await prepared.run(); + } + private async ensureLoaded( baseId: string, workspaceId: string, @@ -304,3 +420,56 @@ function toDate(value: unknown): Date { if (value instanceof Date) return value; return new Date(String(value)); } + +// System property type → system column on base_rows (mirrors the map in +// collection-loader.ts). Kept local to avoid a circular import. +const SYSTEM_PROPERTY_COLUMN_LOOKUP: Record = { + [BasePropertyType.CREATED_AT]: 'createdAt', + [BasePropertyType.LAST_EDITED_AT]: 'updatedAt', + [BasePropertyType.LAST_EDITED_BY]: 'lastUpdatedById', +}; + +// Mirror of collection-loader's `readFromRow`, but keyed off a generic event +// payload (which may be camelCase JSON because it came over EventEmitter / +// Redis rather than straight from Kysely — both shapes round-trip through +// here). The function tolerates both the wire shape and the repo shape. +function readFromRowEvent( + row: Record, + spec: ColumnSpec, +): unknown { + switch (spec.column) { + case 'id': + return row.id; + case 'base_id': + return row.baseId ?? row.base_id; + case 'workspace_id': + return row.workspaceId ?? row.workspace_id; + case 'creator_id': + return row.creatorId ?? row.creator_id; + case 'position': + return row.position; + case 'created_at': + return row.createdAt ?? row.created_at; + case 'updated_at': + return row.updatedAt ?? row.updated_at; + case 'last_updated_by_id': + return row.lastUpdatedById ?? row.last_updated_by_id; + case 'deleted_at': + return null; + case 'search_text': + return ''; + } + + const prop = spec.property; + if (!prop) return null; + + const sysColumn = SYSTEM_PROPERTY_COLUMN_LOOKUP[prop.type]; + if (sysColumn) return row[sysColumn] ?? null; + + const cells = (row.cells as Record | null) ?? {}; + return cells[prop.id] ?? null; +} + +function quoteIdent(name: string): string { + return `"${name.replace(/"/g, '""')}"`; +} diff --git a/apps/server/src/core/base/query-cache/base-query-cache.subscriber.ts b/apps/server/src/core/base/query-cache/base-query-cache.subscriber.ts new file mode 100644 index 00000000..4ad9ab63 --- /dev/null +++ b/apps/server/src/core/base/query-cache/base-query-cache.subscriber.ts @@ -0,0 +1,110 @@ +import { + Injectable, + Logger, + OnApplicationBootstrap, + OnModuleDestroy, +} from '@nestjs/common'; +import Redis from 'ioredis'; +import { EnvironmentService } from '../../../integrations/environment/environment.service'; +import { + createRetryStrategy, + parseRedisUrl, +} from '../../../common/helpers/utils'; +import { QueryCacheConfigProvider } from './query-cache.config'; +import { BaseQueryCacheService } from './base-query-cache.service'; +import { ChangeEnvelope } from './query-cache.types'; + +const CHANNEL_PATTERN = 'base-query-cache:changes:*'; + +/* + * Dedicated ioredis subscriber that forwards change envelopes to the local + * BaseQueryCacheService. A separate connection is required because ioredis + * puts subscribing clients into subscriber-only mode and the shared client + * from RedisService is used for normal commands elsewhere in the app. + * When the query-cache is disabled we do not open a Redis connection at all. + */ +@Injectable() +export class BaseQueryCacheSubscriber + implements OnApplicationBootstrap, OnModuleDestroy +{ + private readonly logger = new Logger(BaseQueryCacheSubscriber.name); + private client: Redis | null = null; + + constructor( + private readonly configProvider: QueryCacheConfigProvider, + private readonly env: EnvironmentService, + private readonly cacheService: BaseQueryCacheService, + ) {} + + async onApplicationBootstrap(): Promise { + if (!this.configProvider.config.enabled) return; + + const redisUrl = this.env.getRedisUrl(); + const { family } = parseRedisUrl(redisUrl); + + this.client = new Redis(redisUrl, { + family, + retryStrategy: createRetryStrategy(), + }); + + this.client.on('error', (err) => { + this.logger.warn(`Subscriber client error: ${err.message}`); + }); + + this.client.on('pmessage', (_pattern, channel, message) => { + this.handleMessage(channel, message).catch((err) => { + const error = err as Error; + this.logger.warn( + `Unhandled error applying change from ${channel}: ${error.message}`, + ); + }); + }); + + try { + await this.client.psubscribe(CHANNEL_PATTERN); + this.logger.log(`Subscribed to ${CHANNEL_PATTERN}`); + } catch (err) { + const error = err as Error; + this.logger.warn(`Failed to psubscribe: ${error.message}`); + } + } + + async onModuleDestroy(): Promise { + if (!this.client) return; + try { + await this.client.quit(); + } catch (err) { + const error = err as Error; + this.logger.warn( + `Failed to close subscriber client cleanly: ${error.message}`, + ); + } + this.client = null; + } + + private async handleMessage( + channel: string, + message: string, + ): Promise { + let envelope: ChangeEnvelope; + try { + envelope = JSON.parse(message) as ChangeEnvelope; + } catch (err) { + const error = err as Error; + this.logger.warn( + `Dropping malformed cache-change message on ${channel}: ${error.message}`, + ); + return; + } + + try { + await this.cacheService.applyChange(envelope); + } catch (err) { + const error = err as Error; + this.logger.warn( + `applyChange failed for ${envelope.baseId}: ${error.message}`, + ); + if (error.stack) this.logger.warn(error.stack); + } + } +} diff --git a/apps/server/src/core/base/query-cache/base-query-cache.write-consumer.ts b/apps/server/src/core/base/query-cache/base-query-cache.write-consumer.ts new file mode 100644 index 00000000..cc95b1d8 --- /dev/null +++ b/apps/server/src/core/base/query-cache/base-query-cache.write-consumer.ts @@ -0,0 +1,153 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; +import { EventName } from '../../../common/events/event.contants'; +import { BaseRowRepo } from '@docmost/db/repos/base/base-row.repo'; +import { + BasePropertyCreatedEvent, + BasePropertyDeletedEvent, + BasePropertyUpdatedEvent, + BaseRowCreatedEvent, + BaseRowDeletedEvent, + BaseRowReorderedEvent, + BaseRowUpdatedEvent, + BaseRowsDeletedEvent, + BaseSchemaBumpedEvent, +} from '../events/base-events'; +import { QueryCacheConfigProvider } from './query-cache.config'; +import { ChangeEnvelope } from './query-cache.types'; + +/* + * Bridges in-process base domain events onto a Redis pub/sub channel so every + * node running the query-cache can keep its resident DuckDB collections in + * sync. Each base gets its own channel (`base-query-cache:changes:${baseId}`) + * to keep pattern matching cheap. When the feature flag is off this class + * registers as a no-op so we pay zero overhead. + */ +@Injectable() +export class BaseQueryCacheWriteConsumer { + private readonly logger = new Logger(BaseQueryCacheWriteConsumer.name); + private readonly redis: Redis; + + constructor( + private readonly redisService: RedisService, + private readonly configProvider: QueryCacheConfigProvider, + private readonly baseRowRepo: BaseRowRepo, + ) { + this.redis = this.redisService.getOrThrow(); + } + + @OnEvent(EventName.BASE_ROW_CREATED) + async onRowCreated(e: BaseRowCreatedEvent): Promise { + if (!this.configProvider.config.enabled) return; + await this.publish(e.baseId, { + kind: 'row-upsert', + baseId: e.baseId, + row: e.row as unknown as Record, + }); + } + + @OnEvent(EventName.BASE_ROW_UPDATED) + async onRowUpdated(e: BaseRowUpdatedEvent): Promise { + if (!this.configProvider.config.enabled) return; + const row = await this.baseRowRepo.findById(e.rowId, { + workspaceId: e.workspaceId, + }); + if (!row) return; + await this.publish(e.baseId, { + kind: 'row-upsert', + baseId: e.baseId, + row: row as unknown as Record, + }); + } + + @OnEvent(EventName.BASE_ROW_DELETED) + async onRowDeleted(e: BaseRowDeletedEvent): Promise { + if (!this.configProvider.config.enabled) return; + await this.publish(e.baseId, { + kind: 'row-delete', + baseId: e.baseId, + rowId: e.rowId, + }); + } + + @OnEvent(EventName.BASE_ROWS_DELETED) + async onRowsDeleted(e: BaseRowsDeletedEvent): Promise { + if (!this.configProvider.config.enabled) return; + await this.publish(e.baseId, { + kind: 'rows-delete', + baseId: e.baseId, + rowIds: e.rowIds, + }); + } + + @OnEvent(EventName.BASE_ROW_REORDERED) + async onRowReordered(e: BaseRowReorderedEvent): Promise { + if (!this.configProvider.config.enabled) return; + await this.publish(e.baseId, { + kind: 'row-reorder', + baseId: e.baseId, + rowId: e.rowId, + position: e.position, + }); + } + + @OnEvent(EventName.BASE_SCHEMA_BUMPED) + async onSchemaBumped(e: BaseSchemaBumpedEvent): Promise { + if (!this.configProvider.config.enabled) return; + await this.publish(e.baseId, { + kind: 'schema-invalidate', + baseId: e.baseId, + schemaVersion: e.schemaVersion, + }); + } + + @OnEvent(EventName.BASE_PROPERTY_UPDATED) + async onPropertyUpdated(e: BasePropertyUpdatedEvent): Promise { + if (!this.configProvider.config.enabled) return; + await this.publish(e.baseId, { + kind: 'schema-invalidate', + baseId: e.baseId, + schemaVersion: e.schemaVersion, + }); + } + + @OnEvent(EventName.BASE_PROPERTY_CREATED) + async onPropertyCreated(e: BasePropertyCreatedEvent): Promise { + if (!this.configProvider.config.enabled) return; + // Property creation doesn't carry a schemaVersion in its payload; use 0 + // so applyChange always treats it as stale relative to the resident + // collection (which has schemaVersion >= 1) and triggers invalidation. + await this.publish(e.baseId, { + kind: 'schema-invalidate', + baseId: e.baseId, + schemaVersion: Number.MAX_SAFE_INTEGER, + }); + } + + @OnEvent(EventName.BASE_PROPERTY_DELETED) + async onPropertyDeleted(e: BasePropertyDeletedEvent): Promise { + if (!this.configProvider.config.enabled) return; + await this.publish(e.baseId, { + kind: 'schema-invalidate', + baseId: e.baseId, + schemaVersion: Number.MAX_SAFE_INTEGER, + }); + } + + private async publish( + baseId: string, + envelope: ChangeEnvelope, + ): Promise { + const channel = `base-query-cache:changes:${baseId}`; + try { + await this.redis.publish(channel, JSON.stringify(envelope)); + } catch (err) { + const error = err as Error; + this.logger.warn( + `Failed to publish cache change for ${baseId}: ${error.message}`, + ); + } + } +} diff --git a/apps/server/src/core/base/query-cache/query-cache.module.ts b/apps/server/src/core/base/query-cache/query-cache.module.ts index 19f1405d..0daa7a25 100644 --- a/apps/server/src/core/base/query-cache/query-cache.module.ts +++ b/apps/server/src/core/base/query-cache/query-cache.module.ts @@ -3,6 +3,8 @@ import { QueryCacheConfigProvider } from './query-cache.config'; import { BaseQueryCacheService } from './base-query-cache.service'; import { BaseQueryRouter } from './base-query-router'; import { CollectionLoader } from './collection-loader'; +import { BaseQueryCacheWriteConsumer } from './base-query-cache.write-consumer'; +import { BaseQueryCacheSubscriber } from './base-query-cache.subscriber'; @Module({ providers: [ @@ -10,6 +12,8 @@ import { CollectionLoader } from './collection-loader'; BaseQueryCacheService, BaseQueryRouter, CollectionLoader, + BaseQueryCacheWriteConsumer, + BaseQueryCacheSubscriber, ], exports: [BaseQueryCacheService, BaseQueryRouter, QueryCacheConfigProvider], })