test interruption + abort

This commit is contained in:
mertalev 2025-10-08 16:41:42 -04:00
parent ebda00fcf0
commit c295a48061
No known key found for this signature in database
GPG key ID: DF6ABC77AAD98C95
2 changed files with 110 additions and 31 deletions

View file

@ -1,7 +1,9 @@
import { getMyUser, LoginResponseDto } from '@immich/sdk';
import { createHash, randomBytes } from 'node:crypto';
import { readFile } from 'node:fs/promises';
import { request as httpRequest } from 'node:http';
import { join } from 'node:path';
import { setTimeout } from 'node:timers/promises';
import { Socket } from 'socket.io-client';
import { createUserDto } from 'src/fixtures';
import { errorDto } from 'src/responses';
@ -318,29 +320,6 @@ describe('/upload', () => {
expect(status).toBe(400);
expect(body).toEqual(errorDto.badRequest('Quota has been exceeded!'));
});
// The current implementation depends on the web server to enforce
// this as this case does not even reach app code, but we test a few
// values here to make sure it stays that way.
it.each([1337, 27, 1024 * 1024 + 5, 512 * 512 + 1])(
'should reject when request body is larger than declared content length of %d bytes',
async (length) => {
const content = randomBytes(length);
const { status } = await request(app)
.post('/upload')
.set('Authorization', `Bearer ${user.accessToken}`)
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`)
.set('Upload-Complete', '?0')
.set('Upload-Length', length.toString())
.set('Content-Length', (length - 1).toString())
.send(content);
expect(status).toBe(400);
},
);
});
describe('resumeUpload', () => {
@ -518,7 +497,7 @@ describe('/upload', () => {
});
});
it('should handle interrupted initial upload and resume', async () => {
it('should handle resume with offset retrieval', async () => {
const totalContent = randomBytes(5000);
const firstPart = totalContent.subarray(0, 2000);
@ -605,6 +584,87 @@ describe('/upload', () => {
expect(response.status).toBe(200);
expect(response.headers['upload-complete']).toBe('?1');
});
it('should abort previous request on new request for same asset', async () => {
const content = randomBytes(10000);
const checksum = createHash('sha1').update(content).digest('base64');
const createResponse = await request(app)
.post('/upload')
.set('Authorization', `Bearer ${user.accessToken}`)
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', `sha=:${checksum}:`)
.set('Upload-Complete', '?0')
.set('Upload-Length', '10000')
.send();
expect(createResponse.status).toBe(201);
const uploadResource = createResponse.headers['location'];
expect(uploadResource).toBeDefined();
// simulate interrupted upload by starting a request and not completing it
const didAbort = new Promise<boolean>((resolve, reject) => {
const req = httpRequest(
{
hostname: 'localhost',
port: 2285,
path: uploadResource,
method: 'PATCH',
headers: {
Authorization: `Bearer ${user.accessToken}`,
'Upload-Draft-Interop-Version': '8',
'X-Immich-Asset-Data': assetData,
'Repr-Digest': `sha=:${checksum}:`,
'Upload-Complete': '?1',
'Upload-Length': '10000',
'Content-Length': '10000',
'Upload-Offset': '0',
'Content-Type': 'application/partial-upload',
},
},
(res) => res.on('close', () => resolve(false)),
);
req.on('error', (err) => {
console.log('First request error:', err.message);
if (err.message === 'socket hang up') {
resolve(true);
} else {
reject(err);
}
});
req.write(content.subarray(0, 2000));
});
await setTimeout(50);
const headResponse = await request(baseUrl)
.head(uploadResource)
.set('Authorization', `Bearer ${user.accessToken}`)
.set('Upload-Draft-Interop-Version', '8');
expect(headResponse.status).toBe(204);
expect(headResponse.headers['upload-offset']).toBe('2000');
expect(headResponse.headers['upload-complete']).toBe('?0');
expect(await didAbort).toBe(true);
const secondResponse = await request(baseUrl)
.patch(uploadResource)
.set('Authorization', `Bearer ${user.accessToken}`)
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', `sha=:${checksum}:`)
.set('Upload-Complete', '?1')
.set('Upload-Length', '10000')
.set('Content-Type', 'application/partial-upload')
.set('Upload-Offset', '2000')
.send(content.subarray(2000));
expect(secondResponse.status).toBe(200);
});
});
describe('cancelUpload', () => {

View file

@ -38,7 +38,7 @@ export class AssetUploadService extends BaseService {
// we can assume the previous request has already failed on the client end.
private activeRequests = new Map<string, { req: Readable; startTime: Date }>();
@OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api] })
@OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api], server: true })
onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) {
const entry = this.activeRequests.get(assetId);
if (!entry) {
@ -80,14 +80,13 @@ export class AssetUploadService extends BaseService {
this.addRequest(asset.id, req);
let checksumBuffer: Buffer | undefined;
const writeStream = this.storageRepository.createOrAppendWriteStream(asset.path);
const writeStream = this.pipe(req, asset.path, contentLength);
if (isComplete) {
const hash = createHash('sha1');
req.on('data', (data: Buffer) => hash.update(data));
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
}
req.pipe(writeStream);
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) {
res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
@ -139,9 +138,8 @@ export class AssetUploadService extends BaseService {
return;
}
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
req.pipe(writeStream);
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
const writeStream = this.pipe(req, path, contentLength);
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
this.setCompleteHeader(res, version, isComplete);
if (!isComplete) {
try {
@ -334,6 +332,27 @@ export class AssetUploadService extends BaseService {
}
}
private pipe(req: Readable, path: string, size: number) {
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
let receivedLength = 0;
req.on('data', (data: Buffer) => {
receivedLength += data.length;
if (!writeStream.write(data)) {
req.pause();
writeStream.once('drain', () => req.resume());
}
});
req.on('close', () => {
if (receivedLength < size) {
writeStream.emit('error', new Error('Request closed before all data received'));
}
writeStream.end();
});
return writeStream;
}
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
if (socket && !socket.destroyed) {
// Express doesn't understand interim responses, so write directly to socket