feat(base): add FormulaService, FormulaLockService, recompute job type

This commit is contained in:
Philipinho
2026-04-24 00:13:20 +01:00
parent 493613e634
commit 2da8779b34
6 changed files with 229 additions and 0 deletions
+6
View File
@@ -15,6 +15,8 @@ import { BaseWsService } from './realtime/base-ws.service';
import { BaseWsConsumers } from './realtime/base-ws-consumers';
import { BasePresenceService } from './realtime/base-presence.service';
import { QueueName } from '../../integrations/queue/constants';
import { FormulaService } from './formula/formula.service';
import { FormulaLockService } from './formula/formula-lock';
@Module({
imports: [BullModule.registerQueue({ name: QueueName.BASE_QUEUE })],
@@ -35,6 +37,8 @@ import { QueueName } from '../../integrations/queue/constants';
BasePresenceService,
BaseWsService,
BaseWsConsumers,
FormulaService,
FormulaLockService,
],
exports: [
BaseService,
@@ -43,6 +47,8 @@ import { QueueName } from '../../integrations/queue/constants';
BaseViewService,
BaseWsService,
BasePresenceService,
FormulaService,
FormulaLockService,
],
})
export class BaseModule {}
@@ -0,0 +1,59 @@
import { Injectable, Logger } from "@nestjs/common";
import { RedisService } from "@nestjs-labs/nestjs-ioredis";
import type { Redis } from "ioredis";
const LOCK_PREFIX = "base-formula-recompute-lock:";
const LOCK_TTL_MS = 15 * 60 * 1000; // 15 min — longer than any realistic backfill
@Injectable()
export class FormulaLockService {
private readonly logger = new Logger(FormulaLockService.name);
private readonly redis: Redis;
constructor(private readonly redisService: RedisService) {
this.redis = this.redisService.getOrThrow();
}
/*
* Returns a release token on success, or null if the lock is held. Callers
* must pass the token back to release() to prevent cross-holder releases.
*/
async acquire(baseId: string): Promise<string | null> {
const token = `${Date.now()}-${Math.random()}`;
const ok = await this.redis.set(
LOCK_PREFIX + baseId,
token,
"PX",
LOCK_TTL_MS,
"NX",
);
return ok === "OK" ? token : null;
}
async release(baseId: string, token: string): Promise<void> {
const lua = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`;
await this.redis.eval(lua, 1, LOCK_PREFIX + baseId, token);
}
/*
* Waits for the lock with a simple polling loop. Returns the token or null
* on timeout. Workers call this at job start — if acquisition times out
* the job is retried by BullMQ.
*/
async acquireWait(baseId: string, opts: { timeoutMs: number; pollMs?: number }): Promise<string | null> {
const deadline = Date.now() + opts.timeoutMs;
const poll = opts.pollMs ?? 500;
while (Date.now() < deadline) {
const t = await this.acquire(baseId);
if (t) return t;
await new Promise((r) => setTimeout(r, poll));
}
return null;
}
}
@@ -0,0 +1,2 @@
// apps/server/src/core/base/formula/formula.constants.ts
export const FORMULA_INLINE_ROW_THRESHOLD = 50;
@@ -0,0 +1,144 @@
import { BadRequestException, Injectable, Logger } from "@nestjs/common";
import { InjectQueue } from "@nestjs/bullmq";
import { Queue } from "bullmq";
import {
BaseFormulaGraph,
evaluate,
registry,
parseRaw,
resolve,
typecheck,
DEFAULT_MAX_DEPTH,
makeErrorCell,
type FormulaAST,
type FormulaTypeOptions,
type PropertyLookup,
type Value,
} from "@docmost/base-formula/server";
import {
QueueJob,
QueueName,
} from "../../../integrations/queue/constants";
import { IBaseFormulaRecomputeJob } from "../../../integrations/queue/constants/queue.interface";
import { BaseProperty } from "@docmost/db/types/entity.types";
import { FormulaParseError } from "@docmost/base-formula/server";
import { FORMULA_INLINE_ROW_THRESHOLD } from "./formula.constants";
@Injectable()
export class FormulaService {
private readonly logger = new Logger(FormulaService.name);
constructor(
@InjectQueue(QueueName.BASE_QUEUE) private readonly queue: Queue,
) {}
get inlineThreshold(): number { return FORMULA_INLINE_ROW_THRESHOLD; }
/*
* Parses a raw source string against the given property set. Used by the
* property create/update service when a formula is saved. Returns the
* canonical FormulaTypeOptions ready to persist, or throws a
* BadRequestException built from the parse errors.
*/
compile(source: string, properties: BaseProperty[]): FormulaTypeOptions {
const nameToId = new Map(properties.map((p) => [p.name, p.id]));
try {
const raw = parseRaw(source);
const resolved = resolve(raw, nameToId);
const typeMap = new Map(
properties.map((p) => [p.id, asResultType(p.type)]),
);
const { resultType } = typecheck(resolved.ast, typeMap, registry);
return {
source,
ast: resolved.ast,
resultType,
dependencies: resolved.dependencies,
astVersion: 1,
};
} catch (e) {
if (e instanceof FormulaParseError) {
throw new BadRequestException({ message: "Invalid formula", errors: e.errors });
}
throw e;
}
}
/*
* Returns the cycle path if the candidate property (post-edit) would
* introduce a cycle, else null. Used before save.
*/
detectCycle(candidate: BaseProperty, allProperties: BaseProperty[]): string[] | null {
const others = allProperties.filter((p) => p.id !== candidate.id);
const graph = new BaseFormulaGraph([...others, candidate]);
return graph.detectCycle(candidate);
}
/*
* Same-row inline evaluation. Returns a patch containing only the cells
* that changed (or errored) due to formula evaluation. Caller merges
* into the user-provided patch and persists.
*/
evaluateInline(args: {
properties: BaseProperty[];
row: Record<string, unknown>;
dirtyProps: string[];
}): Record<string, Value> {
const graph = new BaseFormulaGraph(args.properties);
const affected = graph.affectedFormulas(args.dirtyProps);
if (affected.length === 0) return {};
const ctx = {
registry,
properties: this.buildPropertyLookup(args.properties),
depth: 0,
maxDepth: DEFAULT_MAX_DEPTH,
memo: new Map<string, Value>(),
};
const order = graph.evalOrder().filter((id) => affected.includes(id));
const patch: Record<string, Value> = {};
for (const propId of order) {
const prop = args.properties.find((p) => p.id === propId);
if (!prop || prop.type !== "formula") continue;
const opts = prop.typeOptions as FormulaTypeOptions;
try {
patch[propId] = evaluate(opts.ast as FormulaAST, { ...args.row, ...patch }, ctx);
} catch (e) {
patch[propId] = makeErrorCell("TYPE_MISMATCH", (e as Error).message);
}
}
return patch;
}
/*
* Enqueue a full recompute for the given formula property IDs on the given
* base. Reasons let the worker log why the job ran. Job ID includes baseId
* so BullMQ will dedupe when the same base has multiple edits in flight —
* see FormulaLock for the per-base Redis serialization.
*/
async enqueueRecompute(args: IBaseFormulaRecomputeJob): Promise<void> {
await this.queue.add(QueueJob.BASE_FORMULA_RECOMPUTE, args, {
jobId: `formula-recompute:${args.baseId}:${Date.now()}`,
removeOnComplete: 1000,
removeOnFail: 1000,
});
}
private buildPropertyLookup(props: BaseProperty[]): ReadonlyMap<string, PropertyLookup> {
return new Map(props.map((p) => [p.id, {
id: p.id,
type: p.type,
typeOptions: p.typeOptions,
}]));
}
}
function asResultType(type: string): "number" | "string" | "boolean" | "date" | "null" {
if (type === "number") return "number";
if (type === "text" || type === "url" || type === "email") return "string";
if (type === "checkbox") return "boolean";
if (type === "date" || type === "createdAt" || type === "lastEditedAt") return "date";
if (type === "formula") return "number"; // overridden by the nested formula's own resultType at runtime
return "null";
}
@@ -87,4 +87,5 @@ export enum QueueJob {
BASE_TYPE_CONVERSION = 'base-type-conversion',
BASE_CELL_GC = 'base-cell-gc',
BASE_FORMULA_RECOMPUTE = 'base-formula-recompute',
}
@@ -137,3 +137,20 @@ export interface IBaseCellGcJob {
propertyId: string;
workspaceId: string;
}
export interface IBaseFormulaRecomputeJob {
baseId: string;
workspaceId: string;
propertyIds: string[]; // formula properties to recompute
reason:
| 'formula_created'
| 'formula_edited'
| 'dep_type_changed'
| 'dep_deleted'
| 'bulk_import'
| 'manual';
actorId?: string | null;
// When set, scope recompute to these row IDs instead of the whole base.
// Used by the bulk-write path (> FORMULA_INLINE_ROW_THRESHOLD).
rowIds?: string[];
}