feat(webhooks): scaffold feature flag, CASL subject, queue, and migration

This commit is contained in:
Philipinho
2026-05-15 01:36:05 +01:00
parent 0ae407839f
commit 63c1241125
10 changed files with 162 additions and 2 deletions
+1
View File
@@ -19,4 +19,5 @@ export const Feature = {
SHARING_CONTROLS: 'sharing:controls',
TEMPLATES: 'templates',
VIEWER_COMMENTS: 'comment:viewer',
WEBHOOKS: 'webhooks',
} as const;
+1
View File
@@ -20,6 +20,7 @@ export const Feature = {
VIEWER_COMMENTS: 'comment:viewer',
TEMPLATES: 'templates',
PDF_EXPORT: 'export:pdf',
WEBHOOKS: 'webhooks',
} as const;
export type FeatureKey = (typeof Feature)[keyof typeof Feature];
@@ -42,6 +42,7 @@ function buildWorkspaceOwnerAbility() {
can(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Attachment);
can(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.API);
can(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Audit);
can(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Webhook);
return build();
}
@@ -58,6 +59,7 @@ function buildWorkspaceAdminAbility() {
can(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Member);
can(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Attachment);
can(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.API);
can(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Webhook);
return build();
}
@@ -13,6 +13,7 @@ export enum WorkspaceCaslSubject {
Attachment = 'attachment',
API = 'api_key',
Audit = 'audit',
Webhook = 'webhook',
}
export type IWorkspaceAbility =
@@ -22,4 +23,5 @@ export type IWorkspaceAbility =
| [WorkspaceCaslAction, WorkspaceCaslSubject.Group]
| [WorkspaceCaslAction, WorkspaceCaslSubject.Attachment]
| [WorkspaceCaslAction, WorkspaceCaslSubject.API]
| [WorkspaceCaslAction, WorkspaceCaslSubject.Audit];
| [WorkspaceCaslAction, WorkspaceCaslSubject.Audit]
| [WorkspaceCaslAction, WorkspaceCaslSubject.Webhook];
@@ -0,0 +1,93 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('webhooks')
.ifNotExists()
.addColumn('id', 'uuid', (col) =>
col.primaryKey().defaultTo(sql`gen_uuid_v7()`),
)
.addColumn('workspace_id', 'uuid', (col) =>
col.notNull().references('workspaces.id').onDelete('cascade'),
)
.addColumn('name', 'varchar', (col) => col.notNull())
.addColumn('url', 'text', (col) => col.notNull())
.addColumn('signing_secret', 'text', (col) => col.notNull())
.addColumn('subscribed_events', 'jsonb', (col) =>
col.notNull().defaultTo(sql`'[]'::jsonb`),
)
.addColumn('is_active', 'boolean', (col) =>
col.notNull().defaultTo(true),
)
.addColumn('consecutive_failure_count', 'integer', (col) =>
col.notNull().defaultTo(0),
)
.addColumn('disabled_at', 'timestamptz')
.addColumn('creator_id', 'uuid', (col) =>
col.references('users.id').onDelete('set null'),
)
.addColumn('created_at', 'timestamptz', (col) =>
col.notNull().defaultTo(sql`now()`),
)
.addColumn('updated_at', 'timestamptz', (col) =>
col.notNull().defaultTo(sql`now()`),
)
.execute();
await db.schema
.createIndex('idx_webhooks_workspace_id')
.ifNotExists()
.on('webhooks')
.columns(['workspace_id', 'id desc'])
.execute();
await db.schema
.createIndex('idx_webhooks_subscribed_events')
.ifNotExists()
.on('webhooks')
.using('gin')
.column('subscribed_events')
.execute();
await db.schema
.createTable('webhook_deliveries')
.ifNotExists()
.addColumn('id', 'uuid', (col) =>
col.primaryKey().defaultTo(sql`gen_uuid_v7()`),
)
.addColumn('webhook_id', 'uuid', (col) =>
col.notNull().references('webhooks.id').onDelete('cascade'),
)
.addColumn('workspace_id', 'uuid', (col) =>
col.notNull().references('workspaces.id').onDelete('cascade'),
)
.addColumn('event', 'varchar', (col) => col.notNull())
.addColumn('payload', 'jsonb', (col) => col.notNull())
.addColumn('status', 'varchar', (col) =>
col.notNull().defaultTo('pending'),
)
.addColumn('http_status', 'integer')
.addColumn('response_body', 'text')
.addColumn('error_message', 'text')
.addColumn('attempt_count', 'integer', (col) =>
col.notNull().defaultTo(0),
)
.addColumn('duration_ms', 'integer')
.addColumn('delivered_at', 'timestamptz')
.addColumn('created_at', 'timestamptz', (col) =>
col.notNull().defaultTo(sql`now()`),
)
.execute();
await db.schema
.createIndex('idx_webhook_deliveries_webhook_id')
.ifNotExists()
.on('webhook_deliveries')
.columns(['webhook_id', 'id desc'])
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('webhook_deliveries').execute();
await db.schema.dropTable('webhooks').execute();
}
+33
View File
@@ -589,6 +589,37 @@ export interface UserSessions {
createdAt: Generated<Timestamp>;
}
export interface Webhooks {
id: Generated<string>;
workspaceId: string;
name: string;
url: string;
signingSecret: string;
subscribedEvents: Generated<Json>;
isActive: Generated<boolean>;
consecutiveFailureCount: Generated<number>;
disabledAt: Timestamp | null;
creatorId: string | null;
createdAt: Generated<Timestamp>;
updatedAt: Generated<Timestamp>;
}
export interface WebhookDeliveries {
id: Generated<string>;
webhookId: string;
workspaceId: string;
event: string;
payload: Json;
status: Generated<string>;
httpStatus: number | null;
responseBody: string | null;
errorMessage: string | null;
attemptCount: Generated<number>;
durationMs: number | null;
deliveredAt: Timestamp | null;
createdAt: Generated<Timestamp>;
}
export interface DB {
aiChats: AiChats;
aiChatMessages: AiChatMessages;
@@ -625,6 +656,8 @@ export interface DB {
userSessions: UserSessions;
userTokens: UserTokens;
watchers: Watchers;
webhooks: Webhooks;
webhookDeliveries: WebhookDeliveries;
workspaceInvitations: WorkspaceInvitations;
workspaces: Workspaces;
}
@@ -37,6 +37,8 @@ import {
Watchers,
Audit as _Audit,
Templates,
Webhooks,
WebhookDeliveries,
} from './db';
import { PageEmbeddings } from '@docmost/db/types/embeddings.types';
@@ -238,3 +240,13 @@ export type UpdatableAudit = Updateable<Omit<_Audit, 'id'>>;
export type Template = Selectable<Templates>;
export type InsertableTemplate = Insertable<Templates>;
export type UpdatableTemplate = Updateable<Omit<Templates, 'id'>>;
// Webhook
export type Webhook = Selectable<Webhooks>;
export type InsertableWebhook = Insertable<Webhooks>;
export type UpdatableWebhook = Updateable<Omit<Webhooks, 'id'>>;
// Webhook delivery
export type WebhookDelivery = Selectable<WebhookDeliveries>;
export type InsertableWebhookDelivery = Insertable<WebhookDeliveries>;
export type UpdatableWebhookDelivery = Updateable<Omit<WebhookDeliveries, 'id'>>;
@@ -9,6 +9,7 @@ export enum QueueName {
HISTORY_QUEUE = '{history-queue}',
NOTIFICATION_QUEUE = '{notification-queue}',
AUDIT_QUEUE = '{audit-queue}',
WEBHOOK_QUEUE = '{webhook-queue}',
}
export enum QueueJob {
@@ -83,4 +84,7 @@ export enum QueueJob {
PDF_EXPORT_TASK = 'pdf-export-task',
PDF_EXPORT_CLEANUP = 'pdf-export-cleanup',
WEBHOOK_DELIVERY = 'webhook-delivery',
WEBHOOK_DELIVERY_CLEANUP = 'webhook-delivery-cleanup',
}
@@ -92,6 +92,18 @@ import { GeneralQueueProcessor } from './processors/general-queue.processor';
attempts: 3,
},
}),
BullModule.registerQueue({
name: QueueName.WEBHOOK_QUEUE,
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 10 * 1000,
},
removeOnComplete: { count: 200 },
removeOnFail: { count: 200 },
},
}),
],
exports: [BullModule],
providers: [GeneralQueueProcessor],