From 0a955e21b6a074b80e6feb851cfa155f8480ea61 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Wed, 1 Oct 2025 14:41:21 -0400 Subject: [PATCH] tweaks shared pipe method shared pipe method require size upfront make length optional for patch requests --- e2e/src/api/specs/asset-upload.e2e-spec.ts | 15 +- server/src/dtos/upload.dto.ts | 9 +- server/src/repositories/asset.repository.ts | 48 ++--- server/src/services/asset-upload.service.ts | 228 +++++++++----------- 4 files changed, 138 insertions(+), 162 deletions(-) diff --git a/e2e/src/api/specs/asset-upload.e2e-spec.ts b/e2e/src/api/specs/asset-upload.e2e-spec.ts index f367bda0ff..63e2d02808 100644 --- a/e2e/src/api/specs/asset-upload.e2e-spec.ts +++ b/e2e/src/api/specs/asset-upload.e2e-spec.ts @@ -38,11 +38,11 @@ describe('/upload (RUFH compliance)', () => { .set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`) .set('Upload-Complete', '?1') .set('Content-Type', 'image/jpeg') + .set('Upload-Length', '1024') .send(content); expect(status).toBe(200); expect(headers['upload-complete']).toBe('?1'); - expect(headers['upload-limit']).toEqual('min-size=0'); }); it('should create an incomplete upload with Upload-Complete: ?0', async () => { @@ -55,11 +55,11 @@ describe('/upload (RUFH compliance)', () => { .set('X-Immich-Asset-Data', base64Metadata) .set('Repr-Digest', `sha=:${createHash('sha1').update(partialContent).digest('base64')}:`) .set('Upload-Complete', '?0') - .set('Content-Length', partialContent.length.toString()) + .set('Content-Length', '512') + .set('Upload-Length', '513') .send(partialContent); expect(status).toBe(201); - expect(headers['upload-limit']).toEqual('min-size=0'); expect(headers['location']).toMatch(/^\/api\/upload\/[a-zA-Z0-9\-]+$/); }); }); @@ -77,6 +77,7 @@ describe('/upload (RUFH compliance)', () => { .set('X-Immich-Asset-Data', base64Metadata) .set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`) .set('Upload-Complete', '?0') + .set('Upload-Length', '512') .send(content); expect(headers['location']).toBeDefined(); @@ -130,13 +131,14 @@ describe('/upload (RUFH compliance)', () => { .set('X-Immich-Asset-Data', base64Metadata) .set('Repr-Digest', `sha=:${createHash('sha1').update(fullContent).digest('base64')}:`) .set('Upload-Complete', '?0') + .set('Upload-Length', '2750') .send(chunks[0]); uploadResource = response.headers['location']; }); it('should append data with correct offset', async () => { - const { status, headers, body } = await request(baseUrl) + const { status, headers } = await request(baseUrl) .patch(uploadResource) .set('Authorization', `Bearer ${user.accessToken}`) .set('Upload-Draft-Interop-Version', '8') @@ -233,6 +235,7 @@ describe('/upload (RUFH compliance)', () => { .set('X-Immich-Asset-Data', base64Metadata) .set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`) .set('Upload-Complete', '?0') + .set('Upload-Length', '200') .send(content); uploadResource = response.headers['location']; @@ -269,6 +272,7 @@ describe('/upload (RUFH compliance)', () => { .set('X-Immich-Asset-Data', base64Metadata) .set('Repr-Digest', `sha=:${createHash('sha1').update(totalContent).digest('base64')}:`) .set('Upload-Complete', '?0') // Indicate incomplete + .set('Upload-Length', '5000') .send(firstPart); expect(initialResponse.status).toBe(201); @@ -310,6 +314,7 @@ describe('/upload (RUFH compliance)', () => { .set('X-Immich-Asset-Data', base64Metadata) .set('Repr-Digest', `sha=:${hash.digest('base64')}:`) .set('Upload-Complete', '?0') + .set('Upload-Length', '10000') .send(chunks[0]); const uploadResource = createResponse.headers['location']; @@ -363,7 +368,7 @@ describe('/upload (RUFH compliance)', () => { .set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`) .set('Upload-Complete', '?1') .set('Upload-Length', '2000') // Doesn't match content length - .set('Content-Length', content.length.toString()) + .set('Content-Length', '1000') .send(content); expect(status).toBe(400); diff --git a/server/src/dtos/upload.dto.ts b/server/src/dtos/upload.dto.ts index 936e2e6d28..67e6f851eb 100644 --- a/server/src/dtos/upload.dto.ts +++ b/server/src/dtos/upload.dto.ts @@ -140,6 +140,12 @@ export class StartUploadDto extends BaseUploadHeadersDto { throw new BadRequestException(`Invalid ${UploadHeader.ReprDigest} header`); }) checksum!: Buffer; + + @Expose({ name: UploadHeader.UploadLength }) + @Min(0) + @IsInt() + @Type(() => Number) + declare uploadLength: number; } export class ResumeUploadDto extends BaseUploadHeadersDto { @@ -152,8 +158,7 @@ export class ResumeUploadDto extends BaseUploadHeadersDto { @Min(0) @IsInt() @Type(() => Number) - @Optional() - uploadOffset!: number | null; + uploadOffset!: number; } export class GetUploadStatusDto extends BaseRufhHeadersDto {} diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index 3b4aea42c5..acf656e448 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -253,37 +253,13 @@ export class AssetRepository { return this.db.insertInto('asset').values(asset).returningAll().executeTakeFirstOrThrow(); } - createWithMetadata(asset: Insertable & { id: string }, metadata?: AssetMetadataItem[]) { + createWithMetadata(asset: Insertable & { id: string }, size: number, metadata?: AssetMetadataItem[]) { if (!metadata || metadata.length === 0) { return this.db.insertInto('asset').values(asset).execute(); } return this.db .with('asset', (qb) => qb.insertInto('asset').values(asset).returning('id')) - .insertInto('asset_metadata') - .values(metadata.map(({ key, value }) => ({ assetId: asset.id, key, value }))) - .execute(); - } - - getCompletionMetadata(assetId: string, ownerId: string) { - return this.db - .selectFrom('asset') - .select(['originalPath as path', 'status', 'fileModifiedAt', 'createdAt', 'checksum']) - .where('id', '=', assetId) - .where('ownerId', '=', ownerId) - .executeTakeFirst(); - } - - setCompleteWithSize(assetId: string, size: number) { - return this.db - .with('asset', (qb) => - qb - .updateTable('asset') - .set({ status: AssetStatus.Active }) - .where('asset.id', '=', assetId) - .where('asset.status', '=', sql.lit(AssetStatus.Partial)) - .returning(['asset.id', 'asset.ownerId']), - ) .with('exif', (qb) => qb .insertInto('asset_exif') @@ -297,7 +273,27 @@ export class AssetRepository { .set({ quotaUsageInBytes: sql`"quotaUsageInBytes" + ${size}` }) .whereRef('user.id', '=', 'asset.ownerId'), ) - .selectNoFrom(sql`1`.as('dummy')) + .insertInto('asset_metadata') + .values(metadata.map(({ key, value }) => ({ assetId: asset.id, key, value }))) + .execute(); + } + + getCompletionMetadata(assetId: string, ownerId: string) { + return this.db + .selectFrom('asset') + .innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId') + .select(['originalPath as path', 'status', 'fileModifiedAt', 'createdAt', 'checksum', 'fileSizeInByte as size']) + .where('id', '=', assetId) + .where('ownerId', '=', ownerId) + .executeTakeFirst(); + } + + setCompleteWithSize(assetId: string) { + return this.db + .updateTable('asset') + .set({ status: AssetStatus.Active }) + .where('asset.id', '=', assetId) + .where('asset.status', '=', sql.lit(AssetStatus.Partial)) .execute(); } diff --git a/server/src/services/asset-upload.service.ts b/server/src/services/asset-upload.service.ts index 640b738933..eaa6fa655d 100644 --- a/server/src/services/asset-upload.service.ts +++ b/server/src/services/asset-upload.service.ts @@ -1,7 +1,8 @@ -import { BadRequestException, Injectable, InternalServerErrorException } from '@nestjs/common'; +import { BadRequestException, Injectable } from '@nestjs/common'; import { Response } from 'express'; import { createHash } from 'node:crypto'; import { extname, join } from 'node:path'; +import { Readable } from 'node:stream'; import { StorageCore } from 'src/cores/storage.core'; import { AuthDto } from 'src/dtos/auth.dto'; import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/upload.dto'; @@ -19,7 +20,7 @@ export class AssetUploadService extends BaseService { async startUpload(req: AuthenticatedRequest, res: Response, dto: StartUploadDto): Promise { this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`); const { isComplete, assetData, uploadLength, contentLength, version } = dto; - if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) { + if (isComplete && uploadLength && uploadLength !== contentLength) { return this.sendInconsistentLengthProblem(res); } @@ -48,20 +49,22 @@ export class AssetUploadService extends BaseService { fileCreatedAt: assetData.fileCreatedAt, fileModifiedAt: assetData.fileModifiedAt, localDateTime: assetData.fileCreatedAt, - type: mimeTypes.assetType(path), + type: type, isFavorite: assetData.isFavorite, duration: assetData.duration || null, visibility: assetData.visibility || AssetVisibility.Timeline, originalFileName: assetData.filename, status: AssetStatus.Partial, }, + uploadLength, assetData.metadata, ); } catch (error: any) { if (isAssetChecksumConstraint(error)) { const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(req.auth.user.id, dto.checksum); if (!duplicate) { - throw new InternalServerErrorException('Error locating duplicate for checksum constraint'); + res.status(500).send('Error locating duplicate for checksum constraint'); + return; } if (duplicate.status !== AssetStatus.Partial) { @@ -83,68 +86,31 @@ export class AssetUploadService extends BaseService { await this.storageRepository.mkdir(folder); let checksumBuffer: Buffer | undefined; - const writeStream = this.storageRepository.createWriteStream(path); + const metadata = { id: assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt }; + const writeStream = this.pipe(req, res, metadata); if (isComplete) { const hash = createHash('sha1'); - req.on('data', (chunk: Buffer) => hash.update(chunk)); + req.on('data', (data: Buffer) => hash.update(data)); writeStream.on('finish', () => (checksumBuffer = hash.digest())); } - writeStream.on('error', (error) => { - this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); - if (!res.headersSent) { - res.status(500).setHeader('Location', location).send(); - } - }); - - writeStream.on('finish', async () => { + writeStream.on('finish', () => { + this.setCompleteHeader(res, dto.version, isComplete); if (!isComplete) { - return res.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); + return res.status(201).send(); } this.logger.log(`Finished upload to ${path}`); if (dto.checksum.compare(checksumBuffer!) !== 0) { return this.sendChecksumMismatchResponse(res, assetId, path); } - try { - await this.onComplete({ assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt }); - } finally { - this.setCompleteHeader(res, dto.version, true); - res.status(200).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); - } - }); - - req.on('error', (error) => { - this.logger.error(`Failed to read request body: ${error.message}`); - writeStream.end(); - if (!res.headersSent) { - res.status(500).setHeader('Location', location).send(); - } - }); - - let receivedLength = 0; - req.on('data', (chunk: Buffer) => { - if (receivedLength + chunk.length > contentLength) { - writeStream.destroy(); - req.destroy(); - res.status(400).send('Received more data than specified in content-length'); - return this.onCancel(assetId, path); - } - receivedLength += chunk.length; - if (!writeStream.write(chunk)) { - req.pause(); - writeStream.once('drain', () => req.resume()); - } - }); - - req.on('end', () => { - if (receivedLength === contentLength) { - return writeStream.end(); - } - this.logger.error(`Received ${receivedLength} bytes when expecting ${contentLength} for ${assetId}`); - writeStream.destroy(); - this.onCancel(assetId, path); + this.onComplete(metadata) + .then(() => res.status(200).send()) + .catch((error) => { + this.logger.error(`Failed to complete upload for ${assetId}: ${error.message}`); + res.status(500).send(); + }); }); await new Promise((resolve) => writeStream.on('close', resolve)); } @@ -152,30 +118,25 @@ export class AssetUploadService extends BaseService { resumeUpload(req: AuthenticatedRequest, res: Response, id: string, dto: ResumeUploadDto): Promise { this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`); const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto; - if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) { - this.sendInconsistentLengthProblem(res); - return Promise.resolve(); - } - - if (version && version >= 6 && req.headers['content-type'] !== 'application/partial-upload') { - throw new BadRequestException('Content-Type must be application/partial-upload for PATCH requests'); - } return this.databaseRepository.withUuidLock(id, async () => { - const asset = await this.assetRepository.getCompletionMetadata(id, req.auth.user.id); - if (!asset) { + const completionData = await this.assetRepository.getCompletionMetadata(id, req.auth.user.id); + if (!completionData) { res.status(404).send('Asset not found'); return; } + const { fileModifiedAt, path, status, checksum: providedChecksum, size } = completionData; - if (asset.status !== AssetStatus.Partial) { + if (status !== AssetStatus.Partial) { + this.setCompleteHeader(res, version, false); return this.sendAlreadyCompletedProblem(res); } - if (uploadOffset === null) { - throw new BadRequestException('Missing Upload-Offset header'); + + if (uploadLength && size && size !== uploadLength) { + this.setCompleteHeader(res, version, false); + return this.sendInconsistentLengthProblem(res); } - const { path } = asset; const expectedOffset = await this.getCurrentOffset(path); if (expectedOffset !== uploadOffset) { this.setCompleteHeader(res, version, false); @@ -183,92 +144,103 @@ export class AssetUploadService extends BaseService { } const newLength = uploadOffset + contentLength; - - // If upload length is provided, validate we're not exceeding it if (uploadLength !== undefined && newLength > uploadLength) { + this.setCompleteHeader(res, version, false); res.status(400).send('Upload would exceed declared length'); return; } this.validateQuota(req.auth, newLength); - // Empty PATCH without Upload-Complete if (contentLength === 0 && !isComplete) { this.setCompleteHeader(res, version, false); res.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send(); return; } - const writeStream = this.storageRepository.createOrAppendWriteStream(path); - let receivedLength = 0; - - writeStream.on('error', (error) => { - this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); - if (!res.headersSent) { - res.status(500).send('Failed to write chunk'); - } - }); - + const metadata = { id, path, size: contentLength, fileModifiedAt: fileModifiedAt }; + const writeStream = this.pipe(req, res, metadata); writeStream.on('finish', async () => { + this.setCompleteHeader(res, version, isComplete); const currentOffset = await this.getCurrentOffset(path); if (!isComplete) { - this.setCompleteHeader(res, version, false); return res.status(204).setHeader('Upload-Offset', currentOffset.toString()).send(); } this.logger.log(`Finished upload to ${path}`); const checksum = await this.cryptoRepository.hashFile(path); - if (asset.checksum.compare(checksum) !== 0) { + if (providedChecksum.compare(checksum) !== 0) { return this.sendChecksumMismatchResponse(res, id, path); } try { - await this.onComplete({ assetId: id, path, size: currentOffset, fileModifiedAt: asset.fileModifiedAt }); + await this.onComplete(metadata); } finally { - this.setCompleteHeader(res, version, true); - res.status(200).setHeader('Upload-Offset', currentOffset.toString()).send(); + res.status(200).send(); } }); - - req.on('data', (chunk: Buffer) => { - if (receivedLength + chunk.length > contentLength) { - this.logger.error(`Received more data than specified in content-length for upload to ${path}`); - writeStream.destroy(); - req.destroy(); - res.status(400).send('Received more data than specified in content-length'); - return this.onCancel(id, path); - } - - receivedLength += chunk.length; - if (!writeStream.write(chunk)) { - req.pause(); - writeStream.once('drain', () => req.resume()); - } - }); - - req.on('end', () => { - if (receivedLength === contentLength) { - return writeStream.end(); - } - this.logger.error(`Received ${receivedLength} bytes when expecting ${contentLength} for ${id}`); - writeStream.destroy(); - return this.onCancel(id, path); - }); await new Promise((resolve) => writeStream.on('close', resolve)); }); } - async cancelUpload(auth: AuthDto, assetId: string, response: Response): Promise { - const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id); - if (!asset) { - response.status(404).send('Asset not found'); - return; - } - if (asset.status !== AssetStatus.Partial) { - return this.sendAlreadyCompletedProblem(response); - } - await this.onCancel(assetId, asset.path); - response.status(204).send(); + private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) { + const writeStream = this.storageRepository.createOrAppendWriteStream(path); + writeStream.on('error', (error) => { + this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); + if (!res.headersSent) { + res.status(500).send(); + } + }); + + req.on('error', (error) => { + this.logger.error(`Failed to read request body: ${error.message}`); + if (!res.headersSent) { + res.status(500).send(); + } + }); + + let receivedLength = 0; + req.on('data', (data: Buffer) => { + if (receivedLength + data.length > size) { + writeStream.destroy(); + req.destroy(); + return this.onCancel(id, path).finally(() => + res.status(400).send('Received more data than specified in content-length'), + ); + } + receivedLength += data.length; + if (!writeStream.write(data)) { + req.pause(); + writeStream.once('drain', () => req.resume()); + } + }); + + req.on('end', () => { + if (receivedLength === size) { + return writeStream.end(); + } + writeStream.destroy(); + this.onCancel(id, path).finally(() => + res.status(400).send(`Received ${receivedLength} bytes when expecting ${size}`), + ); + }); + + return writeStream; + } + + cancelUpload(auth: AuthDto, assetId: string, response: Response): Promise { + return this.databaseRepository.withUuidLock(assetId, async () => { + const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id); + if (!asset) { + response.status(404).send('Asset not found'); + return; + } + if (asset.status !== AssetStatus.Partial) { + return this.sendAlreadyCompletedProblem(response); + } + await this.onCancel(assetId, asset.path); + response.status(204).send(); + }); } async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise { @@ -280,9 +252,7 @@ export class AssetUploadService extends BaseService { } const offset = await this.getCurrentOffset(asset.path); - const isComplete = asset.status !== AssetStatus.Partial; - - this.setCompleteHeader(res, version, isComplete); + this.setCompleteHeader(res, version, asset.status !== AssetStatus.Partial); res .status(204) .setHeader('Upload-Offset', offset.toString()) @@ -296,11 +266,10 @@ export class AssetUploadService extends BaseService { response.status(204).setHeader('Upload-Limit', 'min-size=0').setHeader('Allow', 'POST, OPTIONS').send(); } - private async onComplete(data: { assetId: string; path: string; size: number; fileModifiedAt: Date }): Promise { - const { assetId, path, size, fileModifiedAt } = data; - this.logger.debug('Completing upload for asset', assetId); - const jobData = { name: JobName.AssetExtractMetadata, data: { id: assetId, source: 'upload' } } as const; - await withRetry(() => this.assetRepository.setCompleteWithSize(assetId, size)); + private async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) { + this.logger.debug('Completing upload for asset', id); + const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const; + await withRetry(() => this.assetRepository.setCompleteWithSize(id)); try { await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt)); } catch (error: any) { @@ -321,6 +290,7 @@ export class AssetUploadService extends BaseService { socket.write( 'HTTP/1.1 104 Upload Resumption Supported\r\n' + `Location: ${location}\r\n` + + `Upload-Limit: min-size=0\r\n` + `Upload-Draft-Interop-Version: ${interopVersion}\r\n\r\n`, ); }