diff --git a/apps/server/src/core/base/base.module.ts b/apps/server/src/core/base/base.module.ts index f1b9a1bc9..17ba63b2a 100644 --- a/apps/server/src/core/base/base.module.ts +++ b/apps/server/src/core/base/base.module.ts @@ -15,6 +15,8 @@ import { BaseWsService } from './realtime/base-ws.service'; import { BaseWsConsumers } from './realtime/base-ws-consumers'; import { BasePresenceService } from './realtime/base-presence.service'; import { QueueName } from '../../integrations/queue/constants'; +import { FormulaService } from './formula/formula.service'; +import { FormulaLockService } from './formula/formula-lock'; @Module({ imports: [BullModule.registerQueue({ name: QueueName.BASE_QUEUE })], @@ -35,6 +37,8 @@ import { QueueName } from '../../integrations/queue/constants'; BasePresenceService, BaseWsService, BaseWsConsumers, + FormulaService, + FormulaLockService, ], exports: [ BaseService, @@ -43,6 +47,8 @@ import { QueueName } from '../../integrations/queue/constants'; BaseViewService, BaseWsService, BasePresenceService, + FormulaService, + FormulaLockService, ], }) export class BaseModule {} diff --git a/apps/server/src/core/base/formula/formula-lock.ts b/apps/server/src/core/base/formula/formula-lock.ts new file mode 100644 index 000000000..66776f777 --- /dev/null +++ b/apps/server/src/core/base/formula/formula-lock.ts @@ -0,0 +1,59 @@ +import { Injectable, Logger } from "@nestjs/common"; +import { RedisService } from "@nestjs-labs/nestjs-ioredis"; +import type { Redis } from "ioredis"; + +const LOCK_PREFIX = "base-formula-recompute-lock:"; +const LOCK_TTL_MS = 15 * 60 * 1000; // 15 min — longer than any realistic backfill + +@Injectable() +export class FormulaLockService { + private readonly logger = new Logger(FormulaLockService.name); + private readonly redis: Redis; + + constructor(private readonly redisService: RedisService) { + this.redis = this.redisService.getOrThrow(); + } + + /* + * Returns a release token on success, or null if the lock is held. Callers + * must pass the token back to release() to prevent cross-holder releases. + */ + async acquire(baseId: string): Promise { + const token = `${Date.now()}-${Math.random()}`; + const ok = await this.redis.set( + LOCK_PREFIX + baseId, + token, + "PX", + LOCK_TTL_MS, + "NX", + ); + return ok === "OK" ? token : null; + } + + async release(baseId: string, token: string): Promise { + const lua = ` + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) + else + return 0 + end + `; + await this.redis.eval(lua, 1, LOCK_PREFIX + baseId, token); + } + + /* + * Waits for the lock with a simple polling loop. Returns the token or null + * on timeout. Workers call this at job start — if acquisition times out + * the job is retried by BullMQ. + */ + async acquireWait(baseId: string, opts: { timeoutMs: number; pollMs?: number }): Promise { + const deadline = Date.now() + opts.timeoutMs; + const poll = opts.pollMs ?? 500; + while (Date.now() < deadline) { + const t = await this.acquire(baseId); + if (t) return t; + await new Promise((r) => setTimeout(r, poll)); + } + return null; + } +} diff --git a/apps/server/src/core/base/formula/formula.constants.ts b/apps/server/src/core/base/formula/formula.constants.ts new file mode 100644 index 000000000..9cc78a87d --- /dev/null +++ b/apps/server/src/core/base/formula/formula.constants.ts @@ -0,0 +1,2 @@ +// apps/server/src/core/base/formula/formula.constants.ts +export const FORMULA_INLINE_ROW_THRESHOLD = 50; diff --git a/apps/server/src/core/base/formula/formula.service.ts b/apps/server/src/core/base/formula/formula.service.ts new file mode 100644 index 000000000..e879f6434 --- /dev/null +++ b/apps/server/src/core/base/formula/formula.service.ts @@ -0,0 +1,144 @@ +import { BadRequestException, Injectable, Logger } from "@nestjs/common"; +import { InjectQueue } from "@nestjs/bullmq"; +import { Queue } from "bullmq"; +import { + BaseFormulaGraph, + evaluate, + registry, + parseRaw, + resolve, + typecheck, + DEFAULT_MAX_DEPTH, + makeErrorCell, + type FormulaAST, + type FormulaTypeOptions, + type PropertyLookup, + type Value, +} from "@docmost/base-formula/server"; +import { + QueueJob, + QueueName, +} from "../../../integrations/queue/constants"; +import { IBaseFormulaRecomputeJob } from "../../../integrations/queue/constants/queue.interface"; +import { BaseProperty } from "@docmost/db/types/entity.types"; +import { FormulaParseError } from "@docmost/base-formula/server"; +import { FORMULA_INLINE_ROW_THRESHOLD } from "./formula.constants"; + +@Injectable() +export class FormulaService { + private readonly logger = new Logger(FormulaService.name); + + constructor( + @InjectQueue(QueueName.BASE_QUEUE) private readonly queue: Queue, + ) {} + + get inlineThreshold(): number { return FORMULA_INLINE_ROW_THRESHOLD; } + + /* + * Parses a raw source string against the given property set. Used by the + * property create/update service when a formula is saved. Returns the + * canonical FormulaTypeOptions ready to persist, or throws a + * BadRequestException built from the parse errors. + */ + compile(source: string, properties: BaseProperty[]): FormulaTypeOptions { + const nameToId = new Map(properties.map((p) => [p.name, p.id])); + try { + const raw = parseRaw(source); + const resolved = resolve(raw, nameToId); + const typeMap = new Map( + properties.map((p) => [p.id, asResultType(p.type)]), + ); + const { resultType } = typecheck(resolved.ast, typeMap, registry); + return { + source, + ast: resolved.ast, + resultType, + dependencies: resolved.dependencies, + astVersion: 1, + }; + } catch (e) { + if (e instanceof FormulaParseError) { + throw new BadRequestException({ message: "Invalid formula", errors: e.errors }); + } + throw e; + } + } + + /* + * Returns the cycle path if the candidate property (post-edit) would + * introduce a cycle, else null. Used before save. + */ + detectCycle(candidate: BaseProperty, allProperties: BaseProperty[]): string[] | null { + const others = allProperties.filter((p) => p.id !== candidate.id); + const graph = new BaseFormulaGraph([...others, candidate]); + return graph.detectCycle(candidate); + } + + /* + * Same-row inline evaluation. Returns a patch containing only the cells + * that changed (or errored) due to formula evaluation. Caller merges + * into the user-provided patch and persists. + */ + evaluateInline(args: { + properties: BaseProperty[]; + row: Record; + dirtyProps: string[]; + }): Record { + const graph = new BaseFormulaGraph(args.properties); + const affected = graph.affectedFormulas(args.dirtyProps); + if (affected.length === 0) return {}; + + const ctx = { + registry, + properties: this.buildPropertyLookup(args.properties), + depth: 0, + maxDepth: DEFAULT_MAX_DEPTH, + memo: new Map(), + }; + + const order = graph.evalOrder().filter((id) => affected.includes(id)); + const patch: Record = {}; + for (const propId of order) { + const prop = args.properties.find((p) => p.id === propId); + if (!prop || prop.type !== "formula") continue; + const opts = prop.typeOptions as FormulaTypeOptions; + try { + patch[propId] = evaluate(opts.ast as FormulaAST, { ...args.row, ...patch }, ctx); + } catch (e) { + patch[propId] = makeErrorCell("TYPE_MISMATCH", (e as Error).message); + } + } + return patch; + } + + /* + * Enqueue a full recompute for the given formula property IDs on the given + * base. Reasons let the worker log why the job ran. Job ID includes baseId + * so BullMQ will dedupe when the same base has multiple edits in flight — + * see FormulaLock for the per-base Redis serialization. + */ + async enqueueRecompute(args: IBaseFormulaRecomputeJob): Promise { + await this.queue.add(QueueJob.BASE_FORMULA_RECOMPUTE, args, { + jobId: `formula-recompute:${args.baseId}:${Date.now()}`, + removeOnComplete: 1000, + removeOnFail: 1000, + }); + } + + private buildPropertyLookup(props: BaseProperty[]): ReadonlyMap { + return new Map(props.map((p) => [p.id, { + id: p.id, + type: p.type, + typeOptions: p.typeOptions, + }])); + } +} + +function asResultType(type: string): "number" | "string" | "boolean" | "date" | "null" { + if (type === "number") return "number"; + if (type === "text" || type === "url" || type === "email") return "string"; + if (type === "checkbox") return "boolean"; + if (type === "date" || type === "createdAt" || type === "lastEditedAt") return "date"; + if (type === "formula") return "number"; // overridden by the nested formula's own resultType at runtime + return "null"; +} diff --git a/apps/server/src/integrations/queue/constants/queue.constants.ts b/apps/server/src/integrations/queue/constants/queue.constants.ts index 5d29b6168..546553438 100644 --- a/apps/server/src/integrations/queue/constants/queue.constants.ts +++ b/apps/server/src/integrations/queue/constants/queue.constants.ts @@ -87,4 +87,5 @@ export enum QueueJob { BASE_TYPE_CONVERSION = 'base-type-conversion', BASE_CELL_GC = 'base-cell-gc', + BASE_FORMULA_RECOMPUTE = 'base-formula-recompute', } diff --git a/apps/server/src/integrations/queue/constants/queue.interface.ts b/apps/server/src/integrations/queue/constants/queue.interface.ts index 1b240218c..9ddc4d210 100644 --- a/apps/server/src/integrations/queue/constants/queue.interface.ts +++ b/apps/server/src/integrations/queue/constants/queue.interface.ts @@ -137,3 +137,20 @@ export interface IBaseCellGcJob { propertyId: string; workspaceId: string; } + +export interface IBaseFormulaRecomputeJob { + baseId: string; + workspaceId: string; + propertyIds: string[]; // formula properties to recompute + reason: + | 'formula_created' + | 'formula_edited' + | 'dep_type_changed' + | 'dep_deleted' + | 'bulk_import' + | 'manual'; + actorId?: string | null; + // When set, scope recompute to these row IDs instead of the whole base. + // Used by the bulk-write path (> FORMULA_INLINE_ROW_THRESHOLD). + rowIds?: string[]; +}