proactive abortion

This commit is contained in:
mertalev 2025-10-08 00:02:36 -04:00
parent 1915e3ceb2
commit 758553672a
No known key found for this signature in database
GPG key ID: DF6ABC77AAD98C95
5 changed files with 79 additions and 58 deletions

View file

@ -318,6 +318,24 @@ describe('/upload', () => {
expect(status).toBe(400); expect(status).toBe(400);
expect(body).toEqual(errorDto.badRequest('Quota has been exceeded!')); expect(body).toEqual(errorDto.badRequest('Quota has been exceeded!'));
}); });
it('should reject when request body is larger than declared content length', async () => {
const length = 1024 * 1024;
const content = randomBytes(length);
const { status } = await request(app)
.post('/upload')
.set('Authorization', `Bearer ${user.accessToken}`)
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', `sha=:${createHash('sha1').update(content).digest('base64')}:`)
.set('Upload-Complete', '?0')
.set('Upload-Length', length.toString())
.set('Content-Length', (length - 1).toString())
.send(content);
expect(status).toBe(400);
});
}); });
describe('resumeUpload', () => { describe('resumeUpload', () => {
@ -531,7 +549,7 @@ describe('/upload', () => {
expect(resumeResponse.headers['upload-complete']).toBe('?1'); expect(resumeResponse.headers['upload-complete']).toBe('?1');
}); });
it('should handle multiple interruptions and resumptions', async () => { it('should handle multiple chunks', async () => {
const chunks = [randomBytes(2000), randomBytes(3000), randomBytes(5000)]; const chunks = [randomBytes(2000), randomBytes(3000), randomBytes(5000)];
const hash = createHash('sha1'); const hash = createHash('sha1');
for (const chunk of chunks) { for (const chunk of chunks) {

View file

@ -42,7 +42,6 @@ const apiContentLength = {
}; };
// This is important to let go of the asset lock for an inactive request // This is important to let go of the asset lock for an inactive request
// TODO: the recommendation is for a later request to explicitly abort the inactive one rather than waiting for timeout
const SOCKET_TIMEOUT_MS = 30_000; const SOCKET_TIMEOUT_MS = 30_000;
@ApiTags('Upload') @ApiTags('Upload')

View file

@ -453,15 +453,13 @@ export class DatabaseRepository {
async withUuidLock<R>(uuid: string, callback: () => Promise<R>): Promise<R> { async withUuidLock<R>(uuid: string, callback: () => Promise<R>): Promise<R> {
let res; let res;
await this.asyncLock.acquire(uuid, async () => { await this.db.connection().execute(async (connection) => {
await this.db.connection().execute(async (connection) => { try {
try { await this.acquireUuidLock(uuid, connection);
await this.acquireUuidLock(uuid, connection); res = await callback();
res = await callback(); } finally {
} finally { await this.releaseUuidLock(uuid, connection);
await this.releaseUuidLock(uuid, connection); }
}
});
}); });
return res as R; return res as R;

View file

@ -80,6 +80,9 @@ type EventMap = {
// stack bulk events // stack bulk events
StackDeleteAll: [{ stackIds: string[]; userId: string }]; StackDeleteAll: [{ stackIds: string[]; userId: string }];
// upload events
UploadAbort: [{ assetId: string; abortTime: Date }];
// user events // user events
UserSignup: [{ notify: boolean; id: string; password?: string }]; UserSignup: [{ notify: boolean; id: string; password?: string }];
@ -87,7 +90,7 @@ type EventMap = {
WebsocketConnect: [{ userId: string }]; WebsocketConnect: [{ userId: string }];
}; };
export const serverEvents = ['ConfigUpdate'] as const; export const serverEvents = ['ConfigUpdate', 'UploadAbort'] as const;
export type ServerEvents = (typeof serverEvents)[number]; export type ServerEvents = (typeof serverEvents)[number];
export type EmitEvent = keyof EventMap; export type EmitEvent = keyof EventMap;

View file

@ -6,7 +6,7 @@ import { extname, join } from 'node:path';
import { Readable } from 'node:stream'; import { Readable } from 'node:stream';
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants'; import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core'; import { StorageCore } from 'src/cores/storage.core';
import { OnJob } from 'src/decorators'; import { OnEvent, OnJob } from 'src/decorators';
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/asset-upload'; import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/asset-upload';
import { AuthDto } from 'src/dtos/auth.dto'; import { AuthDto } from 'src/dtos/auth.dto';
import { import {
@ -14,11 +14,13 @@ import {
AssetStatus, AssetStatus,
AssetType, AssetType,
AssetVisibility, AssetVisibility,
ImmichWorker,
JobName, JobName,
JobStatus, JobStatus,
QueueName, QueueName,
StorageFolder, StorageFolder,
} from 'src/enum'; } from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { JobItem, JobOf } from 'src/types'; import { JobItem, JobOf } from 'src/types';
import { isAssetChecksumConstraint } from 'src/utils/database'; import { isAssetChecksumConstraint } from 'src/utils/database';
@ -29,6 +31,24 @@ export const MAX_RUFH_INTEROP_VERSION = 8;
@Injectable() @Injectable()
export class AssetUploadService extends BaseService { export class AssetUploadService extends BaseService {
// This is used to proactively abort previous requests for the same asset
// when a new one arrives. The previous request still holds the asset lock
// and will prevent the new request from proceeding until the previous one
// times out. As normal client behavior will not have concurrent requests, we
// we can assume the previous request has already failed on the client end.
private activeRequests = new Map<string, { req: Readable; startTime: Date }>();
@OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api] })
onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) {
const entry = this.activeRequests.get(assetId);
if (entry && abortTime > entry.startTime) {
this.activeRequests.delete(assetId);
entry.req.destroy();
return true;
}
return false;
}
async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise<void> { async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise<void> {
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`); this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
const { isComplete, assetData, uploadLength, contentLength, version } = dto; const { isComplete, assetData, uploadLength, contentLength, version } = dto;
@ -57,15 +77,15 @@ export class AssetUploadService extends BaseService {
this.sendInterimResponse(res, location, version); this.sendInterimResponse(res, location, version);
} }
this.addRequest(asset.id, req);
let checksumBuffer: Buffer | undefined; let checksumBuffer: Buffer | undefined;
const metadata = { id: asset.id, path: asset.path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt }; const writeStream = this.storageRepository.createOrAppendWriteStream(asset.path);
const writeStream = this.pipe(req, res, metadata);
if (isComplete) { if (isComplete) {
const hash = createHash('sha1'); const hash = createHash('sha1');
req.on('data', (data: Buffer) => hash.update(data)); req.on('data', (data: Buffer) => hash.update(data));
writeStream.on('finish', () => (checksumBuffer = hash.digest())); writeStream.on('finish', () => (checksumBuffer = hash.digest()));
} }
req.pipe(writeStream);
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject)); await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
this.setCompleteHeader(res, dto.version, isComplete); this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) { if (!isComplete) {
@ -77,7 +97,7 @@ export class AssetUploadService extends BaseService {
return await this.sendChecksumMismatch(res, asset.id, asset.path); return await this.sendChecksumMismatch(res, asset.id, asset.path);
} }
await this.onComplete(metadata); await this.onComplete({ id: asset.id, path: asset.path, fileModifiedAt: assetData.fileModifiedAt });
res.status(200).send({ id: asset.id }); res.status(200).send({ id: asset.id });
} }
@ -85,6 +105,7 @@ export class AssetUploadService extends BaseService {
this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`); this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`);
const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto; const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto;
this.setCompleteHeader(res, version, false); this.setCompleteHeader(res, version, false);
this.addRequest(id, req);
return this.databaseRepository.withUuidLock(id, async () => { return this.databaseRepository.withUuidLock(id, async () => {
const completionData = await this.assetRepository.getCompletionMetadata(id, auth.user.id); const completionData = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
if (!completionData) { if (!completionData) {
@ -117,8 +138,8 @@ export class AssetUploadService extends BaseService {
return; return;
} }
const metadata = { id, path, size: contentLength, fileModifiedAt }; const writeStream = this.storageRepository.createOrAppendWriteStream(path);
const writeStream = this.pipe(req, res, metadata); req.pipe(writeStream);
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject)); await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
this.setCompleteHeader(res, version, isComplete); this.setCompleteHeader(res, version, isComplete);
if (!isComplete) { if (!isComplete) {
@ -138,12 +159,13 @@ export class AssetUploadService extends BaseService {
return await this.sendChecksumMismatch(res, id, path); return await this.sendChecksumMismatch(res, id, path);
} }
await this.onComplete(metadata); await this.onComplete({ id, path, fileModifiedAt });
res.status(200).send({ id }); res.status(200).send({ id });
}); });
} }
cancelUpload(auth: AuthDto, assetId: string, res: Response): Promise<void> { cancelUpload(auth: AuthDto, assetId: string, res: Response): Promise<void> {
this.abortExistingRequest(assetId);
return this.databaseRepository.withUuidLock(assetId, async () => { return this.databaseRepository.withUuidLock(assetId, async () => {
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id); const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
if (!asset) { if (!asset) {
@ -160,6 +182,7 @@ export class AssetUploadService extends BaseService {
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> { async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
this.logger.verboseFn(() => `Getting upload status for ${id} with version ${version}`); this.logger.verboseFn(() => `Getting upload status for ${id} with version ${version}`);
this.abortExistingRequest(id);
return this.databaseRepository.withUuidLock(id, async () => { return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id); const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
if (!asset) { if (!asset) {
@ -290,45 +313,24 @@ export class AssetUploadService extends BaseService {
await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId)); await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId));
} }
private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) { private addRequest(assetId: string, req: Readable) {
const writeStream = this.storageRepository.createOrAppendWriteStream(path); const addTime = new Date();
writeStream.on('error', (error) => { const activeRequest = { req, startTime: addTime };
this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); this.abortExistingRequest(assetId, addTime);
if (!res.headersSent) { this.activeRequests.set(assetId, activeRequest);
res.status(500).send(); req.on('close', () => {
if (this.activeRequests.get(assetId)?.req === req) {
this.activeRequests.delete(assetId);
} }
}); });
}
req.on('error', (error) => { private abortExistingRequest(assetId: string, abortTime = new Date()) {
this.logger.error(`Failed to read request body: ${error.message}`); const abortEvent = { assetId, abortTime };
if (!res.headersSent) { // only emit if we didn't just abort it ourselves
res.status(500).send(); if (!this.onUploadAbort(abortEvent)) {
} this.eventRepository.serverSend('UploadAbort', abortEvent);
}); }
let receivedLength = 0;
req.on('data', (data: Buffer) => {
if (receivedLength + data.length > size) {
writeStream.destroy();
void this.onCancel(id, path).catch((error: any) =>
this.logger.error(`Failed to remove ${id} after too much data: ${error.message}`),
);
if (!res.headersSent) {
res.status(400).send('Received more data than specified in content-length');
}
res.on('finish', () => req.destroy());
return;
}
receivedLength += data.length;
if (!writeStream.write(data)) {
req.pause();
writeStream.once('drain', () => req.resume());
}
});
req.on('end', () => writeStream.end());
return writeStream;
} }
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void { private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
@ -373,11 +375,12 @@ export class AssetUploadService extends BaseService {
} }
private validateQuota(auth: AuthDto, size: number): void { private validateQuota(auth: AuthDto, size: number): void {
if (auth.user.quotaSizeInBytes === null) { const { quotaSizeInBytes: quotaLimit, quotaUsageInBytes: currentUsage } = auth.user;
if (quotaLimit === null) {
return; return;
} }
if (auth.user.quotaSizeInBytes < auth.user.quotaUsageInBytes + size) { if (quotaLimit < currentUsage + size) {
throw new BadRequestException('Quota has been exceeded!'); throw new BadRequestException('Quota has been exceeded!');
} }
} }