feat: pdf export queue

This commit is contained in:
Philipinho
2026-04-14 16:01:38 +01:00
parent c802d29b85
commit 4ec2e458f8
7 changed files with 125 additions and 16 deletions
@@ -6,6 +6,7 @@ export enum JwtType {
MFA_TOKEN = 'mfa_token', MFA_TOKEN = 'mfa_token',
API_KEY = 'api_key', API_KEY = 'api_key',
PDF_RENDER = 'pdf_render', PDF_RENDER = 'pdf_render',
PDF_EXPORT_DOWNLOAD = 'pdf_export_download',
} }
export type JwtPayload = { export type JwtPayload = {
sub: string; sub: string;
@@ -52,3 +53,9 @@ export type JwtPdfRenderPayload = {
workspaceId: string; workspaceId: string;
type: 'pdf_render'; type: 'pdf_render';
}; };
export type JwtPdfExportDownloadPayload = {
fileTaskId: string;
workspaceId: string;
type: 'pdf_export_download';
};
@@ -13,6 +13,7 @@ import {
JwtExchangePayload, JwtExchangePayload,
JwtMfaTokenPayload, JwtMfaTokenPayload,
JwtPayload, JwtPayload,
JwtPdfExportDownloadPayload,
JwtPdfRenderPayload, JwtPdfRenderPayload,
JwtType, JwtType,
} from '../dto/jwt-payload'; } from '../dto/jwt-payload';
@@ -128,6 +129,18 @@ export class TokenService {
return this.jwtService.sign(payload, { expiresIn: '60s' }); return this.jwtService.sign(payload, { expiresIn: '60s' });
} }
async generatePdfExportDownloadToken(
fileTaskId: string,
workspaceId: string,
): Promise<string> {
const payload: JwtPdfExportDownloadPayload = {
fileTaskId,
workspaceId,
type: JwtType.PDF_EXPORT_DOWNLOAD,
};
return this.jwtService.sign(payload, { expiresIn: '1h' });
}
async verifyJwt(token: string, tokenType: string) { async verifyJwt(token: string, tokenType: string) {
const payload = await this.jwtService.verifyAsync(token, { const payload = await this.jwtService.verifyAsync(token, {
secret: this.environmentService.getAppSecret(), secret: this.environmentService.getAppSecret(),
@@ -0,0 +1,32 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.alterTable('file_tasks')
.addColumn('page_id', 'uuid', (col) =>
col.references('pages.id').onDelete('set null').ifNotExists(),
)
.execute();
await db.schema
.alterTable('file_tasks')
.addColumn('metadata', 'jsonb', (col) => col.ifNotExists())
.execute();
await db.schema
.createIndex('idx_file_tasks_page_export')
.ifNotExists()
.on('file_tasks')
.columns(['page_id', 'workspace_id'])
.where(sql.ref('type'), '=', 'export')
.where(sql.ref('deleted_at'), 'is', null)
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropIndex('idx_file_tasks_page_export').execute();
await db.schema.alterTable('file_tasks').dropColumn('page_id').execute();
await db.schema.alterTable('file_tasks').dropColumn('metadata').execute();
}
+2
View File
@@ -196,6 +196,8 @@ export interface FileTasks {
filePath: string; filePath: string;
fileSize: Int8 | null; fileSize: Int8 | null;
id: Generated<string>; id: Generated<string>;
metadata: Json | null;
pageId: string | null;
source: string | null; source: string | null;
spaceId: string | null; spaceId: string | null;
status: string | null; status: string | null;
@@ -5,6 +5,9 @@ import { QueueJob, QueueName } from 'src/integrations/queue/constants';
import { FileImportTaskService } from '../services/file-import-task.service'; import { FileImportTaskService } from '../services/file-import-task.service';
import { FileTaskStatus } from '../utils/file.utils'; import { FileTaskStatus } from '../utils/file.utils';
import { StorageService } from '../../storage/storage.service'; import { StorageService } from '../../storage/storage.service';
import { ModuleRef } from '@nestjs/core';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
@Processor(QueueName.FILE_TASK_QUEUE) @Processor(QueueName.FILE_TASK_QUEUE)
export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy { export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
@@ -13,6 +16,8 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
constructor( constructor(
private readonly fileTaskService: FileImportTaskService, private readonly fileTaskService: FileImportTaskService,
private readonly storageService: StorageService, private readonly storageService: StorageService,
private readonly moduleRef: ModuleRef,
@InjectKysely() private readonly db: KyselyDB,
) { ) {
super(); super();
} }
@@ -23,8 +28,11 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
case QueueJob.IMPORT_TASK: case QueueJob.IMPORT_TASK:
await this.fileTaskService.processZIpImport(job.data.fileTaskId); await this.fileTaskService.processZIpImport(job.data.fileTaskId);
break; break;
case QueueJob.EXPORT_TASK: case QueueJob.PDF_EXPORT_TASK:
// TODO: export task await this.processExportTask(job.data.fileTaskId);
break;
case QueueJob.PDF_EXPORT_CLEANUP:
await this.processExportCleanup();
break; break;
} }
} catch (err) { } catch (err) {
@@ -33,6 +41,24 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
} }
} }
private getPdfExportService() {
// eslint-disable-next-line @typescript-eslint/no-require-imports
const PdfExportModule = require('./../../../ee/pdf-export/pdf-export.service');
return this.moduleRef.get(PdfExportModule.PdfExportService, {
strict: false,
});
}
private async processExportTask(fileTaskId: string): Promise<void> {
const pdfExportService = this.getPdfExportService();
await pdfExportService.generateAndStorePdf(fileTaskId);
}
private async processExportCleanup(): Promise<void> {
const pdfExportService = this.getPdfExportService();
await pdfExportService.cleanupExpiredExports();
}
@OnWorkerEvent('active') @OnWorkerEvent('active')
onActive(job: Job) { onActive(job: Job) {
this.logger.debug(`Processing ${job.name} job`); this.logger.debug(`Processing ${job.name} job`);
@@ -41,32 +67,39 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
@OnWorkerEvent('failed') @OnWorkerEvent('failed')
async onFailed(job: Job) { async onFailed(job: Job) {
this.logger.error( this.logger.error(
`Error processing ${job.name} job. Import Task ID: ${job.data.fileTaskId}. Reason: ${job.failedReason}`, `Error processing ${job.name} job. File Task ID: ${job.data?.fileTaskId}. Reason: ${job.failedReason}`,
); );
await this.handleFailedJob(job); if (job.name === QueueJob.IMPORT_TASK) {
await this.handleFailedImportJob(job);
} else if (job.name === QueueJob.PDF_EXPORT_TASK) {
await this.handleFailedExportJob(job);
}
} }
@OnWorkerEvent('completed') @OnWorkerEvent('completed')
async onCompleted(job: Job) { async onCompleted(job: Job) {
this.logger.log( this.logger.log(
`Completed ${job.name} job for File task ID ${job.data.fileTaskId}`, `Completed ${job.name} job for File task ID ${job.data?.fileTaskId}`,
); );
try { if (job.name === QueueJob.IMPORT_TASK) {
const fileTask = await this.fileTaskService.getFileTask( try {
job.data.fileTaskId, const fileTask = await this.fileTaskService.getFileTask(
); job.data.fileTaskId,
if (fileTask) { );
await this.storageService.delete(fileTask.filePath); if (fileTask) {
this.logger.debug(`Deleted imported zip file: ${fileTask.filePath}`); await this.storageService.delete(fileTask.filePath);
this.logger.debug(`Deleted imported zip file: ${fileTask.filePath}`);
}
} catch (err) {
this.logger.error(`Failed to delete imported zip file:`, err);
} }
} catch (err) {
this.logger.error(`Failed to delete imported zip file:`, err);
} }
// Export tasks: do NOT delete the file on completion (kept for 24h cache)
} }
private async handleFailedJob(job: Job) { private async handleFailedImportJob(job: Job) {
try { try {
const fileTaskId = job.data.fileTaskId; const fileTaskId = job.data.fileTaskId;
const reason = job.failedReason || 'Unknown error'; const reason = job.failedReason || 'Unknown error';
@@ -86,6 +119,25 @@ export class FileTaskProcessor extends WorkerHost implements OnModuleDestroy {
} }
} }
private async handleFailedExportJob(job: Job) {
try {
const fileTaskId = job.data.fileTaskId;
const reason = job.failedReason || 'Unknown error';
await this.db
.updateTable('fileTasks')
.set({
status: FileTaskStatus.Failed,
errorMessage: reason,
updatedAt: new Date(),
})
.where('id', '=', fileTaskId)
.execute();
} catch (err) {
this.logger.error(err);
}
}
async onModuleDestroy(): Promise<void> { async onModuleDestroy(): Promise<void> {
if (this.worker) { if (this.worker) {
await this.worker.close(); await this.worker.close();
@@ -80,4 +80,7 @@ export enum QueueJob {
AUDIT_LOG = 'audit-log', AUDIT_LOG = 'audit-log',
AUDIT_CLEANUP = 'audit-cleanup', AUDIT_CLEANUP = 'audit-cleanup',
PDF_EXPORT_TASK = 'pdf-export-task',
PDF_EXPORT_CLEANUP = 'pdf-export-cleanup',
} }