diff --git a/apps/server/src/common/events/event.contants.ts b/apps/server/src/common/events/event.contants.ts index 39e33f68f..3a0ecba17 100644 --- a/apps/server/src/common/events/event.contants.ts +++ b/apps/server/src/common/events/event.contants.ts @@ -37,4 +37,7 @@ export enum EventName { BASE_VIEW_DELETED = 'base.view.deleted', BASE_SCHEMA_BUMPED = 'base.schema.bumped', + BASE_ROWS_UPDATED = 'base.rows.updated', + BASE_FORMULA_RECOMPUTE_STARTED = 'base.formula.recompute.started', + BASE_FORMULA_RECOMPUTE_COMPLETED = 'base.formula.recompute.completed', } diff --git a/apps/server/src/core/base/events/base-events.ts b/apps/server/src/core/base/events/base-events.ts index 0e5a97671..6ecfd3c57 100644 --- a/apps/server/src/core/base/events/base-events.ts +++ b/apps/server/src/core/base/events/base-events.ts @@ -46,3 +46,20 @@ export type BaseViewUpdatedEvent = BaseEventBase & { view: BaseView }; export type BaseViewDeletedEvent = BaseEventBase & { viewId: string }; export type BaseSchemaBumpedEvent = BaseEventBase & { schemaVersion: number }; + +export type BaseRowsUpdatedEvent = BaseEventBase & { + rowIds: string[]; + propertyIds: string[]; +}; + +export type BaseFormulaRecomputeStartedEvent = BaseEventBase & { + propertyIds: string[]; + jobId: string; +}; + +export type BaseFormulaRecomputeCompletedEvent = BaseEventBase & { + propertyIds: string[]; + jobId: string; + processed: number; + errored: number; +}; diff --git a/apps/server/src/core/base/processors/base-queue.processor.ts b/apps/server/src/core/base/processors/base-queue.processor.ts index 514ac93a2..13bfb3457 100644 --- a/apps/server/src/core/base/processors/base-queue.processor.ts +++ b/apps/server/src/core/base/processors/base-queue.processor.ts @@ -11,14 +11,20 @@ import { BaseRepo } from '@docmost/db/repos/base/base.repo'; import { QueueJob, QueueName } from '../../../integrations/queue/constants'; import { IBaseCellGcJob, + IBaseFormulaRecomputeJob, IBaseTypeConversionJob, } from '../../../integrations/queue/constants/queue.interface'; import { processBaseTypeConversion } from '../tasks/base-type-conversion.task'; import { processBaseCellGc } from '../tasks/base-cell-gc.task'; +import { processBaseFormulaRecompute } from '../tasks/base-formula-recompute.task'; +import { FormulaLockService } from '../formula/formula-lock'; import { EventName } from '../../../common/events/event.contants'; import { BasePropertyUpdatedEvent, BaseSchemaBumpedEvent, + BaseFormulaRecomputeStartedEvent, + BaseFormulaRecomputeCompletedEvent, + BaseRowsUpdatedEvent, } from '../events/base-events'; @Processor(QueueName.BASE_QUEUE) @@ -34,6 +40,7 @@ export class BaseQueueProcessor private readonly basePropertyRepo: BasePropertyRepo, private readonly baseRepo: BaseRepo, private readonly eventEmitter: EventEmitter2, + private readonly formulaLock: FormulaLockService, ) { super(); } @@ -104,6 +111,73 @@ export class BaseQueueProcessor this.emitSchemaBumped(data.baseId, data.workspaceId, schemaVersion); return; } + case QueueJob.BASE_FORMULA_RECOMPUTE: { + const data = job.data as IBaseFormulaRecomputeJob; + const token = await this.formulaLock.acquireWait(data.baseId, { + timeoutMs: 30_000, + }); + if (!token) { + throw new Error( + `formula recompute: lock acquire timeout for base ${data.baseId}`, + ); + } + try { + this.eventEmitter.emit(EventName.BASE_FORMULA_RECOMPUTE_STARTED, { + baseId: data.baseId, + workspaceId: data.workspaceId, + actorId: data.actorId ?? null, + requestId: null, + propertyIds: data.propertyIds, + jobId: String(job.id ?? ''), + } satisfies BaseFormulaRecomputeStartedEvent); + + const result = await processBaseFormulaRecompute( + this.db, + this.baseRowRepo, + this.basePropertyRepo, + data, + { + progress: (processed) => job.updateProgress({ processed }), + onBatch: async (batch) => { + this.eventEmitter.emit(EventName.BASE_ROWS_UPDATED, { + baseId: data.baseId, + workspaceId: data.workspaceId, + actorId: null, + requestId: null, + rowIds: batch.map((b) => b.id), + propertyIds: data.propertyIds, + } satisfies BaseRowsUpdatedEvent); + }, + }, + ); + + const schemaVersion = await this.baseRepo.bumpSchemaVersion( + data.baseId, + ); + this.eventEmitter.emit(EventName.BASE_SCHEMA_BUMPED, { + baseId: data.baseId, + workspaceId: data.workspaceId, + actorId: data.actorId ?? null, + requestId: null, + schemaVersion, + } satisfies BaseSchemaBumpedEvent); + + this.eventEmitter.emit(EventName.BASE_FORMULA_RECOMPUTE_COMPLETED, { + baseId: data.baseId, + workspaceId: data.workspaceId, + actorId: data.actorId ?? null, + requestId: null, + propertyIds: data.propertyIds, + jobId: String(job.id ?? ''), + processed: result.processed, + errored: result.errored, + } satisfies BaseFormulaRecomputeCompletedEvent); + + return result; + } finally { + await this.formulaLock.release(data.baseId, token); + } + } default: this.logger.warn(`Unknown job: ${job.name}`); }