diff --git a/apps/server/src/common/helpers/utils.ts b/apps/server/src/common/helpers/utils.ts index 313f2358..7c94bb48 100644 --- a/apps/server/src/common/helpers/utils.ts +++ b/apps/server/src/common/helpers/utils.ts @@ -2,6 +2,7 @@ import * as path from 'path'; import * as bcrypt from 'bcrypt'; import { sanitize } from 'sanitize-filename-ts'; import { FastifyRequest } from 'fastify'; +import { Readable, Transform } from 'stream'; export const envPath = path.resolve(process.cwd(), '..', '..', '.env'); @@ -118,3 +119,18 @@ export function normalizePostgresUrl(url: string): string { parsed.search = newParams.toString(); return parsed.toString(); } + +export function createByteCountingStream(source: Readable) { + let bytesRead = 0; + const stream = new Transform({ + transform(chunk, encoding, callback) { + bytesRead += chunk.length; + callback(null, chunk); + }, + }); + + source.pipe(stream); + source.on('error', (err) => stream.emit('error', err)); + + return { stream, getBytesRead: () => bytesRead }; +} diff --git a/apps/server/src/core/attachment/attachment.utils.ts b/apps/server/src/core/attachment/attachment.utils.ts index ee72dc9f..23512002 100644 --- a/apps/server/src/core/attachment/attachment.utils.ts +++ b/apps/server/src/core/attachment/attachment.utils.ts @@ -5,15 +5,17 @@ import { sanitizeFileName } from '../../common/helpers'; import * as sharp from 'sharp'; export interface PreparedFile { - buffer: Buffer; + buffer?: Buffer; fileName: string; fileSize: number; fileExtension: string; mimeType: string; + multiPartFile?: MultipartFile; } export async function prepareFile( filePromise: Promise, + options: { skipBuffer?: boolean } = {}, ): Promise { const file = await filePromise; @@ -22,10 +24,16 @@ export async function prepareFile( } try { - const buffer = await file.toBuffer(); + let buffer: Buffer | undefined; + let fileSize = 0; + + if (!options.skipBuffer) { + buffer = await file.toBuffer(); + fileSize = buffer.length; + } + const sanitizedFilename = sanitizeFileName(file.filename); const fileName = sanitizedFilename.slice(0, 255); - const fileSize = buffer.length; const fileExtension = path.extname(file.filename).toLowerCase(); return { @@ -34,6 +42,7 @@ export async function prepareFile( fileSize, fileExtension, mimeType: file.mimetype, + multiPartFile: file, }; } catch (error) { throw error; diff --git a/apps/server/src/core/attachment/services/attachment.service.ts b/apps/server/src/core/attachment/services/attachment.service.ts index 77a044a2..ea94b983 100644 --- a/apps/server/src/core/attachment/services/attachment.service.ts +++ b/apps/server/src/core/attachment/services/attachment.service.ts @@ -4,6 +4,7 @@ import { Logger, NotFoundException, } from '@nestjs/common'; +import { Readable } from 'stream'; import { StorageService } from '../../../integrations/storage/storage.service'; import { MultipartFile } from '@fastify/multipart'; import { @@ -26,6 +27,7 @@ import { SpaceRepo } from '@docmost/db/repos/space/space.repo'; import { InjectQueue } from '@nestjs/bullmq'; import { QueueJob, QueueName } from '../../../integrations/queue/constants'; import { Queue } from 'bullmq'; +import { createByteCountingStream } from '../../../common/helpers/utils'; @Injectable() export class AttachmentService { @@ -49,7 +51,9 @@ export class AttachmentService { attachmentId?: string; }) { const { filePromise, pageId, spaceId, userId, workspaceId } = opts; - const preparedFile: PreparedFile = await prepareFile(filePromise); + const preparedFile: PreparedFile = await prepareFile(filePromise, { + skipBuffer: true, + }); let isUpdate = false; let attachmentId = null; @@ -81,7 +85,14 @@ export class AttachmentService { const filePath = `${getAttachmentFolderPath(AttachmentType.File, workspaceId)}/${attachmentId}/${preparedFile.fileName}`; - await this.uploadToDrive(filePath, preparedFile.buffer); + const { stream, getBytesRead } = createByteCountingStream( + preparedFile.multiPartFile.file, + ); + + await this.uploadToDrive(filePath, stream); + + // Update fileSize from the consumed stream + preparedFile.fileSize = getBytesRead(); let attachment: Attachment = null; try { @@ -142,7 +153,10 @@ export class AttachmentService { const preparedFile: PreparedFile = await prepareFile(filePromise); validateFileType(preparedFile.fileExtension, validImageExtensions); - const processedBuffer = await compressAndResizeIcon(preparedFile.buffer, type); + const processedBuffer = await compressAndResizeIcon( + preparedFile.buffer, + type, + ); preparedFile.buffer = processedBuffer; preparedFile.fileSize = processedBuffer.length; preparedFile.fileName = uuid4() + preparedFile.fileExtension; @@ -232,9 +246,9 @@ export class AttachmentService { } } - async uploadToDrive(filePath: string, fileBuffer: any) { + async uploadToDrive(filePath: string, fileContent: Buffer | Readable) { try { - await this.storageService.upload(filePath, fileBuffer); + await this.storageService.upload(filePath, fileContent); } catch (err) { this.logger.error('Error uploading file to drive:', err); throw new BadRequestException('Error uploading file to drive'); diff --git a/apps/server/src/integrations/import/services/import.service.ts b/apps/server/src/integrations/import/services/import.service.ts index 7901122a..aeeebcee 100644 --- a/apps/server/src/integrations/import/services/import.service.ts +++ b/apps/server/src/integrations/import/services/import.service.ts @@ -10,7 +10,11 @@ import { } from '../../../collaboration/collaboration.util'; import { InjectKysely } from 'nestjs-kysely'; import { KyselyDB } from '@docmost/db/types/kysely.types'; -import { generateSlugId, sanitizeFileName } from '../../../common/helpers'; +import { + generateSlugId, + sanitizeFileName, + createByteCountingStream, +} from '../../../common/helpers'; import { generateJitteredKeyBetween } from 'fractional-indexing-jittered'; import { TiptapTransformer } from '@hocuspocus/transformer'; import * as Y from 'yjs'; @@ -173,15 +177,24 @@ export class ImportService { }; } - async getNewPagePosition(spaceId: string): Promise { - const lastPage = await this.db + async getNewPagePosition( + spaceId: string, + parentPageId?: string, + ): Promise { + let query = this.db .selectFrom('pages') .select(['id', 'position']) .where('spaceId', '=', spaceId) .orderBy('position', (ob) => ob.collate('C').desc()) - .limit(1) - .where('parentPageId', 'is', null) - .executeTakeFirst(); + .limit(1); + + if (parentPageId) { + query = query.where('parentPageId', '=', parentPageId); + } else { + query = query.where('parentPageId', 'is', null); + } + + const lastPage = await query.executeTakeFirst(); if (lastPage) { return generateJitteredKeyBetween(lastPage.position, null); @@ -198,20 +211,21 @@ export class ImportService { workspaceId: string, ) { const file = await filePromise; - const fileBuffer = await file.toBuffer(); const fileExtension = path.extname(file.filename).toLowerCase(); const fileName = sanitizeFileName( path.basename(file.filename, fileExtension), ); - const fileSize = fileBuffer.length; - const fileNameWithExt = fileName + fileExtension; const fileTaskId = uuid7(); const filePath = `${getFileTaskFolderPath(FileTaskType.Import, workspaceId)}/${fileTaskId}/${fileNameWithExt}`; // upload file - await this.storageService.upload(filePath, fileBuffer); + const { stream, getBytesRead } = createByteCountingStream(file.file); + + await this.storageService.upload(filePath, stream); + + const fileSize = getBytesRead(); const fileTask = await this.db .insertInto('fileTasks') diff --git a/apps/server/src/integrations/storage/drivers/local.driver.ts b/apps/server/src/integrations/storage/drivers/local.driver.ts index 5171066c..aada2c05 100644 --- a/apps/server/src/integrations/storage/drivers/local.driver.ts +++ b/apps/server/src/integrations/storage/drivers/local.driver.ts @@ -20,9 +20,15 @@ export class LocalDriver implements StorageDriver { return join(this.config.storagePath, filePath); } - async upload(filePath: string, file: Buffer): Promise { + async upload(filePath: string, file: Buffer | Readable): Promise { try { - await fs.outputFile(this._fullPath(filePath), file); + const fullPath = this._fullPath(filePath); + if (file instanceof Buffer) { + await fs.outputFile(fullPath, file); + } else { + await fs.mkdir(dirname(fullPath), { recursive: true }); + await pipeline(file, createWriteStream(fullPath)); + } } catch (err) { throw new Error(`Failed to upload file: ${(err as Error).message}`); } @@ -42,7 +48,7 @@ export class LocalDriver implements StorageDriver { try { const fromFullPath = this._fullPath(fromFilePath); const toFullPath = this._fullPath(toFilePath); - + if (await this.exists(fromFilePath)) { await fs.copy(fromFullPath, toFullPath); } diff --git a/apps/server/src/integrations/storage/drivers/s3.driver.ts b/apps/server/src/integrations/storage/drivers/s3.driver.ts index f6d48677..ed44fded 100644 --- a/apps/server/src/integrations/storage/drivers/s3.driver.ts +++ b/apps/server/src/integrations/storage/drivers/s3.driver.ts @@ -23,19 +23,21 @@ export class S3Driver implements StorageDriver { this.s3Client = new S3Client(config as any); } - async upload(filePath: string, file: Buffer): Promise { + async upload(filePath: string, file: Buffer | Readable): Promise { try { const contentType = getMimeType(filePath); - const command = new PutObjectCommand({ - Bucket: this.config.bucket, - Key: filePath, - Body: file, - ContentType: contentType, - // ACL: "public-read", + const upload = new Upload({ + client: this.s3Client, + params: { + Bucket: this.config.bucket, + Key: filePath, + Body: file, + ContentType: contentType, + }, }); - await this.s3Client.send(command); + await upload.done(); } catch (err) { throw new Error(`Failed to upload file: ${(err as Error).message}`); } diff --git a/apps/server/src/integrations/storage/interfaces/storage-driver.interface.ts b/apps/server/src/integrations/storage/interfaces/storage-driver.interface.ts index 22a86d2b..f376c56f 100644 --- a/apps/server/src/integrations/storage/interfaces/storage-driver.interface.ts +++ b/apps/server/src/integrations/storage/interfaces/storage-driver.interface.ts @@ -1,7 +1,7 @@ import { Readable } from 'stream'; export interface StorageDriver { - upload(filePath: string, file: Buffer): Promise; + upload(filePath: string, file: Buffer | Readable): Promise; uploadStream(filePath: string, file: Readable, options?: { recreateClient?: boolean }): Promise; diff --git a/apps/server/src/integrations/storage/storage.service.ts b/apps/server/src/integrations/storage/storage.service.ts index d796351b..3ed887af 100644 --- a/apps/server/src/integrations/storage/storage.service.ts +++ b/apps/server/src/integrations/storage/storage.service.ts @@ -8,9 +8,9 @@ export class StorageService { private readonly logger = new Logger(StorageService.name); constructor( @Inject(STORAGE_DRIVER_TOKEN) private storageDriver: StorageDriver, - ) {} + ) { } - async upload(filePath: string, fileContent: Buffer | any) { + async upload(filePath: string, fileContent: Buffer | Readable) { await this.storageDriver.upload(filePath, fileContent); this.logger.debug(`File uploaded successfully. Path: ${filePath}`); }