feat(base): wire formula recompute job into queue processor

This commit is contained in:
Philipinho
2026-04-24 00:21:20 +01:00
parent 46386bf4e1
commit fbee344e96
3 changed files with 94 additions and 0 deletions
@@ -37,4 +37,7 @@ export enum EventName {
BASE_VIEW_DELETED = 'base.view.deleted',
BASE_SCHEMA_BUMPED = 'base.schema.bumped',
BASE_ROWS_UPDATED = 'base.rows.updated',
BASE_FORMULA_RECOMPUTE_STARTED = 'base.formula.recompute.started',
BASE_FORMULA_RECOMPUTE_COMPLETED = 'base.formula.recompute.completed',
}
@@ -46,3 +46,20 @@ export type BaseViewUpdatedEvent = BaseEventBase & { view: BaseView };
export type BaseViewDeletedEvent = BaseEventBase & { viewId: string };
export type BaseSchemaBumpedEvent = BaseEventBase & { schemaVersion: number };
export type BaseRowsUpdatedEvent = BaseEventBase & {
rowIds: string[];
propertyIds: string[];
};
export type BaseFormulaRecomputeStartedEvent = BaseEventBase & {
propertyIds: string[];
jobId: string;
};
export type BaseFormulaRecomputeCompletedEvent = BaseEventBase & {
propertyIds: string[];
jobId: string;
processed: number;
errored: number;
};
@@ -11,14 +11,20 @@ import { BaseRepo } from '@docmost/db/repos/base/base.repo';
import { QueueJob, QueueName } from '../../../integrations/queue/constants';
import {
IBaseCellGcJob,
IBaseFormulaRecomputeJob,
IBaseTypeConversionJob,
} from '../../../integrations/queue/constants/queue.interface';
import { processBaseTypeConversion } from '../tasks/base-type-conversion.task';
import { processBaseCellGc } from '../tasks/base-cell-gc.task';
import { processBaseFormulaRecompute } from '../tasks/base-formula-recompute.task';
import { FormulaLockService } from '../formula/formula-lock';
import { EventName } from '../../../common/events/event.contants';
import {
BasePropertyUpdatedEvent,
BaseSchemaBumpedEvent,
BaseFormulaRecomputeStartedEvent,
BaseFormulaRecomputeCompletedEvent,
BaseRowsUpdatedEvent,
} from '../events/base-events';
@Processor(QueueName.BASE_QUEUE)
@@ -34,6 +40,7 @@ export class BaseQueueProcessor
private readonly basePropertyRepo: BasePropertyRepo,
private readonly baseRepo: BaseRepo,
private readonly eventEmitter: EventEmitter2,
private readonly formulaLock: FormulaLockService,
) {
super();
}
@@ -104,6 +111,73 @@ export class BaseQueueProcessor
this.emitSchemaBumped(data.baseId, data.workspaceId, schemaVersion);
return;
}
case QueueJob.BASE_FORMULA_RECOMPUTE: {
const data = job.data as IBaseFormulaRecomputeJob;
const token = await this.formulaLock.acquireWait(data.baseId, {
timeoutMs: 30_000,
});
if (!token) {
throw new Error(
`formula recompute: lock acquire timeout for base ${data.baseId}`,
);
}
try {
this.eventEmitter.emit(EventName.BASE_FORMULA_RECOMPUTE_STARTED, {
baseId: data.baseId,
workspaceId: data.workspaceId,
actorId: data.actorId ?? null,
requestId: null,
propertyIds: data.propertyIds,
jobId: String(job.id ?? ''),
} satisfies BaseFormulaRecomputeStartedEvent);
const result = await processBaseFormulaRecompute(
this.db,
this.baseRowRepo,
this.basePropertyRepo,
data,
{
progress: (processed) => job.updateProgress({ processed }),
onBatch: async (batch) => {
this.eventEmitter.emit(EventName.BASE_ROWS_UPDATED, {
baseId: data.baseId,
workspaceId: data.workspaceId,
actorId: null,
requestId: null,
rowIds: batch.map((b) => b.id),
propertyIds: data.propertyIds,
} satisfies BaseRowsUpdatedEvent);
},
},
);
const schemaVersion = await this.baseRepo.bumpSchemaVersion(
data.baseId,
);
this.eventEmitter.emit(EventName.BASE_SCHEMA_BUMPED, {
baseId: data.baseId,
workspaceId: data.workspaceId,
actorId: data.actorId ?? null,
requestId: null,
schemaVersion,
} satisfies BaseSchemaBumpedEvent);
this.eventEmitter.emit(EventName.BASE_FORMULA_RECOMPUTE_COMPLETED, {
baseId: data.baseId,
workspaceId: data.workspaceId,
actorId: data.actorId ?? null,
requestId: null,
propertyIds: data.propertyIds,
jobId: String(job.id ?? ''),
processed: result.processed,
errored: result.errored,
} satisfies BaseFormulaRecomputeCompletedEvent);
return result;
} finally {
await this.formulaLock.release(data.baseId, token);
}
}
default:
this.logger.warn(`Unknown job: ${job.name}`);
}