From 98c8c28b62d9626e4d68b7f9cfaa57b1a58242c1 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Wed, 8 Oct 2025 16:41:42 -0400 Subject: [PATCH] test interruption + abort --- e2e/src/api/specs/asset-upload.e2e-spec.ts | 108 +++++++++++++++----- server/src/services/asset-upload.service.ts | 33 ++++-- 2 files changed, 110 insertions(+), 31 deletions(-) diff --git a/e2e/src/api/specs/asset-upload.e2e-spec.ts b/e2e/src/api/specs/asset-upload.e2e-spec.ts index 0c34dbfc94..dacd1518e2 100644 --- a/e2e/src/api/specs/asset-upload.e2e-spec.ts +++ b/e2e/src/api/specs/asset-upload.e2e-spec.ts @@ -1,7 +1,9 @@ import { getMyUser, LoginResponseDto } from '@immich/sdk'; import { createHash, randomBytes } from 'node:crypto'; import { readFile } from 'node:fs/promises'; +import { request as httpRequest } from 'node:http'; import { join } from 'node:path'; +import { setTimeout } from 'node:timers/promises'; import { Socket } from 'socket.io-client'; import { createUserDto } from 'src/fixtures'; import { errorDto } from 'src/responses'; @@ -318,29 +320,6 @@ describe('/upload', () => { expect(status).toBe(400); expect(body).toEqual(errorDto.badRequest('Quota has been exceeded!')); }); - - // The current implementation depends on the web server to enforce - // this as this case does not even reach app code, but we test a few - // values here to make sure it stays that way. - it.each([1337, 27, 1024 * 1024 + 5, 512 * 512 + 1])( - 'should reject when request body is larger than declared content length of %d bytes', - async (length) => { - const content = randomBytes(length); - - const { status } = await request(app) - .post('/upload') - .set('Authorization', `Bearer ${user.accessToken}`) - .set('Upload-Draft-Interop-Version', '8') - .set('X-Immich-Asset-Data', assetData) - .set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`) - .set('Upload-Complete', '?0') - .set('Upload-Length', length.toString()) - .set('Content-Length', (length - 1).toString()) - .send(content); - - expect(status).toBe(400); - }, - ); }); describe('resumeUpload', () => { @@ -518,7 +497,7 @@ describe('/upload', () => { }); }); - it('should handle interrupted initial upload and resume', async () => { + it('should handle resume with offset retrieval', async () => { const totalContent = randomBytes(5000); const firstPart = totalContent.subarray(0, 2000); @@ -605,6 +584,87 @@ describe('/upload', () => { expect(response.status).toBe(200); expect(response.headers['upload-complete']).toBe('?1'); }); + + it('should abort previous request on new request for same asset', async () => { + const content = randomBytes(10000); + const checksum = createHash('sha1').update(content).digest('base64'); + + const createResponse = await request(app) + .post('/upload') + .set('Authorization', `Bearer ${user.accessToken}`) + .set('Upload-Draft-Interop-Version', '8') + .set('X-Immich-Asset-Data', assetData) + .set('Repr-Digest', `sha=:${checksum}:`) + .set('Upload-Complete', '?0') + .set('Upload-Length', '10000') + .send(); + + expect(createResponse.status).toBe(201); + const uploadResource = createResponse.headers['location']; + expect(uploadResource).toBeDefined(); + + // simulate interrupted upload by starting a request and not completing it + const didAbort = new Promise((resolve, reject) => { + const req = httpRequest( + { + hostname: 'localhost', + port: 2285, + path: uploadResource, + method: 'PATCH', + headers: { + Authorization: `Bearer ${user.accessToken}`, + 'Upload-Draft-Interop-Version': '8', + 'X-Immich-Asset-Data': assetData, + 'Repr-Digest': `sha=:${checksum}:`, + 'Upload-Complete': '?1', + 'Upload-Length': '10000', + 'Content-Length': '10000', + 'Upload-Offset': '0', + 'Content-Type': 'application/partial-upload', + }, + }, + (res) => res.on('close', () => resolve(false)), + ); + + req.on('error', (err) => { + console.log('First request error:', err.message); + if (err.message === 'socket hang up') { + resolve(true); + } else { + reject(err); + } + }); + + req.write(content.subarray(0, 2000)); + }); + + await setTimeout(50); + + const headResponse = await request(baseUrl) + .head(uploadResource) + .set('Authorization', `Bearer ${user.accessToken}`) + .set('Upload-Draft-Interop-Version', '8'); + + expect(headResponse.status).toBe(204); + expect(headResponse.headers['upload-offset']).toBe('2000'); + expect(headResponse.headers['upload-complete']).toBe('?0'); + + expect(await didAbort).toBe(true); + + const secondResponse = await request(baseUrl) + .patch(uploadResource) + .set('Authorization', `Bearer ${user.accessToken}`) + .set('Upload-Draft-Interop-Version', '8') + .set('X-Immich-Asset-Data', assetData) + .set('Repr-Digest', `sha=:${checksum}:`) + .set('Upload-Complete', '?1') + .set('Upload-Length', '10000') + .set('Content-Type', 'application/partial-upload') + .set('Upload-Offset', '2000') + .send(content.subarray(2000)); + + expect(secondResponse.status).toBe(200); + }); }); describe('cancelUpload', () => { diff --git a/server/src/services/asset-upload.service.ts b/server/src/services/asset-upload.service.ts index f9df0f80d2..b9c4c2f3ed 100644 --- a/server/src/services/asset-upload.service.ts +++ b/server/src/services/asset-upload.service.ts @@ -38,7 +38,7 @@ export class AssetUploadService extends BaseService { // we can assume the previous request has already failed on the client end. private activeRequests = new Map(); - @OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api] }) + @OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api], server: true }) onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) { const entry = this.activeRequests.get(assetId); if (!entry) { @@ -80,14 +80,13 @@ export class AssetUploadService extends BaseService { this.addRequest(asset.id, req); let checksumBuffer: Buffer | undefined; - const writeStream = this.storageRepository.createOrAppendWriteStream(asset.path); + const writeStream = this.pipe(req, asset.path, contentLength); if (isComplete) { const hash = createHash('sha1'); req.on('data', (data: Buffer) => hash.update(data)); writeStream.on('finish', () => (checksumBuffer = hash.digest())); } - req.pipe(writeStream); - await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject)); + await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject)); this.setCompleteHeader(res, dto.version, isComplete); if (!isComplete) { res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); @@ -139,9 +138,8 @@ export class AssetUploadService extends BaseService { return; } - const writeStream = this.storageRepository.createOrAppendWriteStream(path); - req.pipe(writeStream); - await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject)); + const writeStream = this.pipe(req, path, contentLength); + await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject)); this.setCompleteHeader(res, version, isComplete); if (!isComplete) { try { @@ -334,6 +332,27 @@ export class AssetUploadService extends BaseService { } } + private pipe(req: Readable, path: string, size: number) { + const writeStream = this.storageRepository.createOrAppendWriteStream(path); + let receivedLength = 0; + req.on('data', (data: Buffer) => { + receivedLength += data.length; + if (!writeStream.write(data)) { + req.pause(); + writeStream.once('drain', () => req.resume()); + } + }); + + req.on('close', () => { + if (receivedLength < size) { + writeStream.emit('error', new Error('Request closed before all data received')); + } + writeStream.end(); + }); + + return writeStream; + } + private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void { if (socket && !socket.destroyed) { // Express doesn't understand interim responses, so write directly to socket