mirror of
https://github.com/immich-app/immich
synced 2025-11-14 17:36:12 +00:00
test interruption + abort
This commit is contained in:
parent
6b1d26d3a2
commit
98c8c28b62
2 changed files with 110 additions and 31 deletions
|
|
@ -1,7 +1,9 @@
|
||||||
import { getMyUser, LoginResponseDto } from '@immich/sdk';
|
import { getMyUser, LoginResponseDto } from '@immich/sdk';
|
||||||
import { createHash, randomBytes } from 'node:crypto';
|
import { createHash, randomBytes } from 'node:crypto';
|
||||||
import { readFile } from 'node:fs/promises';
|
import { readFile } from 'node:fs/promises';
|
||||||
|
import { request as httpRequest } from 'node:http';
|
||||||
import { join } from 'node:path';
|
import { join } from 'node:path';
|
||||||
|
import { setTimeout } from 'node:timers/promises';
|
||||||
import { Socket } from 'socket.io-client';
|
import { Socket } from 'socket.io-client';
|
||||||
import { createUserDto } from 'src/fixtures';
|
import { createUserDto } from 'src/fixtures';
|
||||||
import { errorDto } from 'src/responses';
|
import { errorDto } from 'src/responses';
|
||||||
|
|
@ -318,29 +320,6 @@ describe('/upload', () => {
|
||||||
expect(status).toBe(400);
|
expect(status).toBe(400);
|
||||||
expect(body).toEqual(errorDto.badRequest('Quota has been exceeded!'));
|
expect(body).toEqual(errorDto.badRequest('Quota has been exceeded!'));
|
||||||
});
|
});
|
||||||
|
|
||||||
// The current implementation depends on the web server to enforce
|
|
||||||
// this as this case does not even reach app code, but we test a few
|
|
||||||
// values here to make sure it stays that way.
|
|
||||||
it.each([1337, 27, 1024 * 1024 + 5, 512 * 512 + 1])(
|
|
||||||
'should reject when request body is larger than declared content length of %d bytes',
|
|
||||||
async (length) => {
|
|
||||||
const content = randomBytes(length);
|
|
||||||
|
|
||||||
const { status } = await request(app)
|
|
||||||
.post('/upload')
|
|
||||||
.set('Authorization', `Bearer ${user.accessToken}`)
|
|
||||||
.set('Upload-Draft-Interop-Version', '8')
|
|
||||||
.set('X-Immich-Asset-Data', assetData)
|
|
||||||
.set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`)
|
|
||||||
.set('Upload-Complete', '?0')
|
|
||||||
.set('Upload-Length', length.toString())
|
|
||||||
.set('Content-Length', (length - 1).toString())
|
|
||||||
.send(content);
|
|
||||||
|
|
||||||
expect(status).toBe(400);
|
|
||||||
},
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('resumeUpload', () => {
|
describe('resumeUpload', () => {
|
||||||
|
|
@ -518,7 +497,7 @@ describe('/upload', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should handle interrupted initial upload and resume', async () => {
|
it('should handle resume with offset retrieval', async () => {
|
||||||
const totalContent = randomBytes(5000);
|
const totalContent = randomBytes(5000);
|
||||||
const firstPart = totalContent.subarray(0, 2000);
|
const firstPart = totalContent.subarray(0, 2000);
|
||||||
|
|
||||||
|
|
@ -605,6 +584,87 @@ describe('/upload', () => {
|
||||||
expect(response.status).toBe(200);
|
expect(response.status).toBe(200);
|
||||||
expect(response.headers['upload-complete']).toBe('?1');
|
expect(response.headers['upload-complete']).toBe('?1');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should abort previous request on new request for same asset', async () => {
|
||||||
|
const content = randomBytes(10000);
|
||||||
|
const checksum = createHash('sha1').update(content).digest('base64');
|
||||||
|
|
||||||
|
const createResponse = await request(app)
|
||||||
|
.post('/upload')
|
||||||
|
.set('Authorization', `Bearer ${user.accessToken}`)
|
||||||
|
.set('Upload-Draft-Interop-Version', '8')
|
||||||
|
.set('X-Immich-Asset-Data', assetData)
|
||||||
|
.set('Repr-Digest', `sha=:${checksum}:`)
|
||||||
|
.set('Upload-Complete', '?0')
|
||||||
|
.set('Upload-Length', '10000')
|
||||||
|
.send();
|
||||||
|
|
||||||
|
expect(createResponse.status).toBe(201);
|
||||||
|
const uploadResource = createResponse.headers['location'];
|
||||||
|
expect(uploadResource).toBeDefined();
|
||||||
|
|
||||||
|
// simulate interrupted upload by starting a request and not completing it
|
||||||
|
const didAbort = new Promise<boolean>((resolve, reject) => {
|
||||||
|
const req = httpRequest(
|
||||||
|
{
|
||||||
|
hostname: 'localhost',
|
||||||
|
port: 2285,
|
||||||
|
path: uploadResource,
|
||||||
|
method: 'PATCH',
|
||||||
|
headers: {
|
||||||
|
Authorization: `Bearer ${user.accessToken}`,
|
||||||
|
'Upload-Draft-Interop-Version': '8',
|
||||||
|
'X-Immich-Asset-Data': assetData,
|
||||||
|
'Repr-Digest': `sha=:${checksum}:`,
|
||||||
|
'Upload-Complete': '?1',
|
||||||
|
'Upload-Length': '10000',
|
||||||
|
'Content-Length': '10000',
|
||||||
|
'Upload-Offset': '0',
|
||||||
|
'Content-Type': 'application/partial-upload',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
(res) => res.on('close', () => resolve(false)),
|
||||||
|
);
|
||||||
|
|
||||||
|
req.on('error', (err) => {
|
||||||
|
console.log('First request error:', err.message);
|
||||||
|
if (err.message === 'socket hang up') {
|
||||||
|
resolve(true);
|
||||||
|
} else {
|
||||||
|
reject(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
req.write(content.subarray(0, 2000));
|
||||||
|
});
|
||||||
|
|
||||||
|
await setTimeout(50);
|
||||||
|
|
||||||
|
const headResponse = await request(baseUrl)
|
||||||
|
.head(uploadResource)
|
||||||
|
.set('Authorization', `Bearer ${user.accessToken}`)
|
||||||
|
.set('Upload-Draft-Interop-Version', '8');
|
||||||
|
|
||||||
|
expect(headResponse.status).toBe(204);
|
||||||
|
expect(headResponse.headers['upload-offset']).toBe('2000');
|
||||||
|
expect(headResponse.headers['upload-complete']).toBe('?0');
|
||||||
|
|
||||||
|
expect(await didAbort).toBe(true);
|
||||||
|
|
||||||
|
const secondResponse = await request(baseUrl)
|
||||||
|
.patch(uploadResource)
|
||||||
|
.set('Authorization', `Bearer ${user.accessToken}`)
|
||||||
|
.set('Upload-Draft-Interop-Version', '8')
|
||||||
|
.set('X-Immich-Asset-Data', assetData)
|
||||||
|
.set('Repr-Digest', `sha=:${checksum}:`)
|
||||||
|
.set('Upload-Complete', '?1')
|
||||||
|
.set('Upload-Length', '10000')
|
||||||
|
.set('Content-Type', 'application/partial-upload')
|
||||||
|
.set('Upload-Offset', '2000')
|
||||||
|
.send(content.subarray(2000));
|
||||||
|
|
||||||
|
expect(secondResponse.status).toBe(200);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('cancelUpload', () => {
|
describe('cancelUpload', () => {
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ export class AssetUploadService extends BaseService {
|
||||||
// we can assume the previous request has already failed on the client end.
|
// we can assume the previous request has already failed on the client end.
|
||||||
private activeRequests = new Map<string, { req: Readable; startTime: Date }>();
|
private activeRequests = new Map<string, { req: Readable; startTime: Date }>();
|
||||||
|
|
||||||
@OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api] })
|
@OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api], server: true })
|
||||||
onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) {
|
onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) {
|
||||||
const entry = this.activeRequests.get(assetId);
|
const entry = this.activeRequests.get(assetId);
|
||||||
if (!entry) {
|
if (!entry) {
|
||||||
|
|
@ -80,14 +80,13 @@ export class AssetUploadService extends BaseService {
|
||||||
|
|
||||||
this.addRequest(asset.id, req);
|
this.addRequest(asset.id, req);
|
||||||
let checksumBuffer: Buffer | undefined;
|
let checksumBuffer: Buffer | undefined;
|
||||||
const writeStream = this.storageRepository.createOrAppendWriteStream(asset.path);
|
const writeStream = this.pipe(req, asset.path, contentLength);
|
||||||
if (isComplete) {
|
if (isComplete) {
|
||||||
const hash = createHash('sha1');
|
const hash = createHash('sha1');
|
||||||
req.on('data', (data: Buffer) => hash.update(data));
|
req.on('data', (data: Buffer) => hash.update(data));
|
||||||
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
|
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
|
||||||
}
|
}
|
||||||
req.pipe(writeStream);
|
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
|
||||||
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
|
|
||||||
this.setCompleteHeader(res, dto.version, isComplete);
|
this.setCompleteHeader(res, dto.version, isComplete);
|
||||||
if (!isComplete) {
|
if (!isComplete) {
|
||||||
res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||||
|
|
@ -139,9 +138,8 @@ export class AssetUploadService extends BaseService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
|
const writeStream = this.pipe(req, path, contentLength);
|
||||||
req.pipe(writeStream);
|
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
|
||||||
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
|
|
||||||
this.setCompleteHeader(res, version, isComplete);
|
this.setCompleteHeader(res, version, isComplete);
|
||||||
if (!isComplete) {
|
if (!isComplete) {
|
||||||
try {
|
try {
|
||||||
|
|
@ -334,6 +332,27 @@ export class AssetUploadService extends BaseService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private pipe(req: Readable, path: string, size: number) {
|
||||||
|
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
|
||||||
|
let receivedLength = 0;
|
||||||
|
req.on('data', (data: Buffer) => {
|
||||||
|
receivedLength += data.length;
|
||||||
|
if (!writeStream.write(data)) {
|
||||||
|
req.pause();
|
||||||
|
writeStream.once('drain', () => req.resume());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
req.on('close', () => {
|
||||||
|
if (receivedLength < size) {
|
||||||
|
writeStream.emit('error', new Error('Request closed before all data received'));
|
||||||
|
}
|
||||||
|
writeStream.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
return writeStream;
|
||||||
|
}
|
||||||
|
|
||||||
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
|
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
|
||||||
if (socket && !socket.destroyed) {
|
if (socket && !socket.destroyed) {
|
||||||
// Express doesn't understand interim responses, so write directly to socket
|
// Express doesn't understand interim responses, so write directly to socket
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue