From 758553672a9c6dd59f791e301e019eeec94a0caa Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Wed, 8 Oct 2025 00:02:36 -0400 Subject: [PATCH] proactive abortion --- e2e/src/api/specs/asset-upload.e2e-spec.ts | 20 +++- .../controllers/asset-upload.controller.ts | 1 - .../src/repositories/database.repository.ts | 16 ++-- server/src/repositories/event.repository.ts | 5 +- server/src/services/asset-upload.service.ts | 95 ++++++++++--------- 5 files changed, 79 insertions(+), 58 deletions(-) diff --git a/e2e/src/api/specs/asset-upload.e2e-spec.ts b/e2e/src/api/specs/asset-upload.e2e-spec.ts index a9998bfc8f..83a1bad6cc 100644 --- a/e2e/src/api/specs/asset-upload.e2e-spec.ts +++ b/e2e/src/api/specs/asset-upload.e2e-spec.ts @@ -318,6 +318,24 @@ describe('/upload', () => { expect(status).toBe(400); expect(body).toEqual(errorDto.badRequest('Quota has been exceeded!')); }); + + it('should reject when request body is larger than declared content length', async () => { + const length = 1024 * 1024; + const content = randomBytes(length); + + const { status } = await request(app) + .post('/upload') + .set('Authorization', `Bearer ${user.accessToken}`) + .set('Upload-Draft-Interop-Version', '8') + .set('X-Immich-Asset-Data', assetData) + .set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`) + .set('Upload-Complete', '?0') + .set('Upload-Length', length.toString()) + .set('Content-Length', (length - 1).toString()) + .send(content); + + expect(status).toBe(400); + }); }); describe('resumeUpload', () => { @@ -531,7 +549,7 @@ describe('/upload', () => { expect(resumeResponse.headers['upload-complete']).toBe('?1'); }); - it('should handle multiple interruptions and resumptions', async () => { + it('should handle multiple chunks', async () => { const chunks = [randomBytes(2000), randomBytes(3000), randomBytes(5000)]; const hash = createHash('sha1'); for (const chunk of chunks) { diff --git a/server/src/controllers/asset-upload.controller.ts b/server/src/controllers/asset-upload.controller.ts index 7955291fbc..e306f77e66 100644 --- a/server/src/controllers/asset-upload.controller.ts +++ b/server/src/controllers/asset-upload.controller.ts @@ -42,7 +42,6 @@ const apiContentLength = { }; // This is important to let go of the asset lock for an inactive request -// TODO: the recommendation is for a later request to explicitly abort the inactive one rather than waiting for timeout const SOCKET_TIMEOUT_MS = 30_000; @ApiTags('Upload') diff --git a/server/src/repositories/database.repository.ts b/server/src/repositories/database.repository.ts index 369e597a55..73aa37e362 100644 --- a/server/src/repositories/database.repository.ts +++ b/server/src/repositories/database.repository.ts @@ -453,15 +453,13 @@ export class DatabaseRepository { async withUuidLock(uuid: string, callback: () => Promise): Promise { let res; - await this.asyncLock.acquire(uuid, async () => { - await this.db.connection().execute(async (connection) => { - try { - await this.acquireUuidLock(uuid, connection); - res = await callback(); - } finally { - await this.releaseUuidLock(uuid, connection); - } - }); + await this.db.connection().execute(async (connection) => { + try { + await this.acquireUuidLock(uuid, connection); + res = await callback(); + } finally { + await this.releaseUuidLock(uuid, connection); + } }); return res as R; diff --git a/server/src/repositories/event.repository.ts b/server/src/repositories/event.repository.ts index ec4c8a8f52..03f5b6fbb9 100644 --- a/server/src/repositories/event.repository.ts +++ b/server/src/repositories/event.repository.ts @@ -80,6 +80,9 @@ type EventMap = { // stack bulk events StackDeleteAll: [{ stackIds: string[]; userId: string }]; + // upload events + UploadAbort: [{ assetId: string; abortTime: Date }]; + // user events UserSignup: [{ notify: boolean; id: string; password?: string }]; @@ -87,7 +90,7 @@ type EventMap = { WebsocketConnect: [{ userId: string }]; }; -export const serverEvents = ['ConfigUpdate'] as const; +export const serverEvents = ['ConfigUpdate', 'UploadAbort'] as const; export type ServerEvents = (typeof serverEvents)[number]; export type EmitEvent = keyof EventMap; diff --git a/server/src/services/asset-upload.service.ts b/server/src/services/asset-upload.service.ts index f8efe6e03b..86255eca66 100644 --- a/server/src/services/asset-upload.service.ts +++ b/server/src/services/asset-upload.service.ts @@ -6,7 +6,7 @@ import { extname, join } from 'node:path'; import { Readable } from 'node:stream'; import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; -import { OnJob } from 'src/decorators'; +import { OnEvent, OnJob } from 'src/decorators'; import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/asset-upload'; import { AuthDto } from 'src/dtos/auth.dto'; import { @@ -14,11 +14,13 @@ import { AssetStatus, AssetType, AssetVisibility, + ImmichWorker, JobName, JobStatus, QueueName, StorageFolder, } from 'src/enum'; +import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; import { JobItem, JobOf } from 'src/types'; import { isAssetChecksumConstraint } from 'src/utils/database'; @@ -29,6 +31,24 @@ export const MAX_RUFH_INTEROP_VERSION = 8; @Injectable() export class AssetUploadService extends BaseService { + // This is used to proactively abort previous requests for the same asset + // when a new one arrives. The previous request still holds the asset lock + // and will prevent the new request from proceeding until the previous one + // times out. As normal client behavior will not have concurrent requests, we + // we can assume the previous request has already failed on the client end. + private activeRequests = new Map(); + + @OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api] }) + onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) { + const entry = this.activeRequests.get(assetId); + if (entry && abortTime > entry.startTime) { + this.activeRequests.delete(assetId); + entry.req.destroy(); + return true; + } + return false; + } + async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise { this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`); const { isComplete, assetData, uploadLength, contentLength, version } = dto; @@ -57,15 +77,15 @@ export class AssetUploadService extends BaseService { this.sendInterimResponse(res, location, version); } + this.addRequest(asset.id, req); let checksumBuffer: Buffer | undefined; - const metadata = { id: asset.id, path: asset.path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt }; - const writeStream = this.pipe(req, res, metadata); - + const writeStream = this.storageRepository.createOrAppendWriteStream(asset.path); if (isComplete) { const hash = createHash('sha1'); req.on('data', (data: Buffer) => hash.update(data)); writeStream.on('finish', () => (checksumBuffer = hash.digest())); } + req.pipe(writeStream); await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject)); this.setCompleteHeader(res, dto.version, isComplete); if (!isComplete) { @@ -77,7 +97,7 @@ export class AssetUploadService extends BaseService { return await this.sendChecksumMismatch(res, asset.id, asset.path); } - await this.onComplete(metadata); + await this.onComplete({ id: asset.id, path: asset.path, fileModifiedAt: assetData.fileModifiedAt }); res.status(200).send({ id: asset.id }); } @@ -85,6 +105,7 @@ export class AssetUploadService extends BaseService { this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`); const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto; this.setCompleteHeader(res, version, false); + this.addRequest(id, req); return this.databaseRepository.withUuidLock(id, async () => { const completionData = await this.assetRepository.getCompletionMetadata(id, auth.user.id); if (!completionData) { @@ -117,8 +138,8 @@ export class AssetUploadService extends BaseService { return; } - const metadata = { id, path, size: contentLength, fileModifiedAt }; - const writeStream = this.pipe(req, res, metadata); + const writeStream = this.storageRepository.createOrAppendWriteStream(path); + req.pipe(writeStream); await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject)); this.setCompleteHeader(res, version, isComplete); if (!isComplete) { @@ -138,12 +159,13 @@ export class AssetUploadService extends BaseService { return await this.sendChecksumMismatch(res, id, path); } - await this.onComplete(metadata); + await this.onComplete({ id, path, fileModifiedAt }); res.status(200).send({ id }); }); } cancelUpload(auth: AuthDto, assetId: string, res: Response): Promise { + this.abortExistingRequest(assetId); return this.databaseRepository.withUuidLock(assetId, async () => { const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id); if (!asset) { @@ -160,6 +182,7 @@ export class AssetUploadService extends BaseService { async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise { this.logger.verboseFn(() => `Getting upload status for ${id} with version ${version}`); + this.abortExistingRequest(id); return this.databaseRepository.withUuidLock(id, async () => { const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id); if (!asset) { @@ -290,45 +313,24 @@ export class AssetUploadService extends BaseService { await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId)); } - private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) { - const writeStream = this.storageRepository.createOrAppendWriteStream(path); - writeStream.on('error', (error) => { - this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); - if (!res.headersSent) { - res.status(500).send(); + private addRequest(assetId: string, req: Readable) { + const addTime = new Date(); + const activeRequest = { req, startTime: addTime }; + this.abortExistingRequest(assetId, addTime); + this.activeRequests.set(assetId, activeRequest); + req.on('close', () => { + if (this.activeRequests.get(assetId)?.req === req) { + this.activeRequests.delete(assetId); } }); + } - req.on('error', (error) => { - this.logger.error(`Failed to read request body: ${error.message}`); - if (!res.headersSent) { - res.status(500).send(); - } - }); - - let receivedLength = 0; - req.on('data', (data: Buffer) => { - if (receivedLength + data.length > size) { - writeStream.destroy(); - void this.onCancel(id, path).catch((error: any) => - this.logger.error(`Failed to remove ${id} after too much data: ${error.message}`), - ); - if (!res.headersSent) { - res.status(400).send('Received more data than specified in content-length'); - } - res.on('finish', () => req.destroy()); - return; - } - receivedLength += data.length; - if (!writeStream.write(data)) { - req.pause(); - writeStream.once('drain', () => req.resume()); - } - }); - - req.on('end', () => writeStream.end()); - - return writeStream; + private abortExistingRequest(assetId: string, abortTime = new Date()) { + const abortEvent = { assetId, abortTime }; + // only emit if we didn't just abort it ourselves + if (!this.onUploadAbort(abortEvent)) { + this.eventRepository.serverSend('UploadAbort', abortEvent); + } } private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void { @@ -373,11 +375,12 @@ export class AssetUploadService extends BaseService { } private validateQuota(auth: AuthDto, size: number): void { - if (auth.user.quotaSizeInBytes === null) { + const { quotaSizeInBytes: quotaLimit, quotaUsageInBytes: currentUsage } = auth.user; + if (quotaLimit === null) { return; } - if (auth.user.quotaSizeInBytes < auth.user.quotaUsageInBytes + size) { + if (quotaLimit < currentUsage + size) { throw new BadRequestException('Quota has been exceeded!'); } }