2025-10-06 19:47:27 -04:00
|
|
|
import { BadRequestException, Injectable, InternalServerErrorException } from '@nestjs/common';
|
2025-09-29 18:09:06 -04:00
|
|
|
import { Response } from 'express';
|
2025-10-02 18:34:49 -04:00
|
|
|
import { DateTime } from 'luxon';
|
2025-09-24 13:56:46 -04:00
|
|
|
import { createHash } from 'node:crypto';
|
2025-10-12 19:18:52 -04:00
|
|
|
import { dirname, extname, join } from 'node:path';
|
|
|
|
|
import { Readable, Writable } from 'node:stream';
|
2025-10-10 19:26:22 -04:00
|
|
|
import { SystemConfig } from 'src/config';
|
2025-10-02 18:34:49 -04:00
|
|
|
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
|
2025-09-24 13:56:46 -04:00
|
|
|
import { StorageCore } from 'src/cores/storage.core';
|
2025-10-08 00:02:36 -04:00
|
|
|
import { OnEvent, OnJob } from 'src/decorators';
|
2025-10-09 16:12:47 -04:00
|
|
|
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/asset-upload.dto';
|
2025-09-24 13:56:46 -04:00
|
|
|
import { AuthDto } from 'src/dtos/auth.dto';
|
2025-10-03 01:24:38 -04:00
|
|
|
import {
|
|
|
|
|
AssetMetadataKey,
|
|
|
|
|
AssetStatus,
|
|
|
|
|
AssetType,
|
|
|
|
|
AssetVisibility,
|
2025-10-08 00:02:36 -04:00
|
|
|
ImmichWorker,
|
2025-10-03 01:24:38 -04:00
|
|
|
JobName,
|
|
|
|
|
JobStatus,
|
|
|
|
|
QueueName,
|
|
|
|
|
StorageFolder,
|
|
|
|
|
} from 'src/enum';
|
2025-10-08 00:02:36 -04:00
|
|
|
import { ArgOf } from 'src/repositories/event.repository';
|
2025-09-24 13:56:46 -04:00
|
|
|
import { BaseService } from 'src/services/base.service';
|
2025-10-02 18:34:49 -04:00
|
|
|
import { JobItem, JobOf } from 'src/types';
|
2025-09-24 13:56:46 -04:00
|
|
|
import { isAssetChecksumConstraint } from 'src/utils/database';
|
|
|
|
|
import { mimeTypes } from 'src/utils/mime-types';
|
2025-10-01 11:57:29 -04:00
|
|
|
import { withRetry } from 'src/utils/misc';
|
2025-09-24 13:56:46 -04:00
|
|
|
|
2025-09-29 18:09:06 -04:00
|
|
|
export const MAX_RUFH_INTEROP_VERSION = 8;
|
2025-09-29 04:31:47 -04:00
|
|
|
|
2025-09-24 13:56:46 -04:00
|
|
|
@Injectable()
|
|
|
|
|
export class AssetUploadService extends BaseService {
|
2025-10-08 00:02:36 -04:00
|
|
|
// This is used to proactively abort previous requests for the same asset
|
|
|
|
|
// when a new one arrives. The previous request still holds the asset lock
|
|
|
|
|
// and will prevent the new request from proceeding until the previous one
|
2025-10-08 00:04:24 -04:00
|
|
|
// times out. As normal client behavior will not have concurrent requests,
|
2025-10-08 00:02:36 -04:00
|
|
|
// we can assume the previous request has already failed on the client end.
|
|
|
|
|
private activeRequests = new Map<string, { req: Readable; startTime: Date }>();
|
|
|
|
|
|
2025-10-08 16:41:42 -04:00
|
|
|
@OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api], server: true })
|
2025-10-08 00:02:36 -04:00
|
|
|
onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) {
|
|
|
|
|
const entry = this.activeRequests.get(assetId);
|
2025-10-08 00:41:47 -04:00
|
|
|
if (!entry) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
if (abortTime > entry.startTime) {
|
2025-10-08 00:02:36 -04:00
|
|
|
entry.req.destroy();
|
2025-10-09 17:36:52 -04:00
|
|
|
this.activeRequests.delete(assetId);
|
2025-10-08 00:02:36 -04:00
|
|
|
}
|
2025-10-09 17:36:52 -04:00
|
|
|
return true;
|
2025-10-08 00:02:36 -04:00
|
|
|
}
|
|
|
|
|
|
2025-10-03 01:24:38 -04:00
|
|
|
async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise<void> {
|
2025-09-29 18:09:06 -04:00
|
|
|
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
|
2025-10-10 20:59:53 -04:00
|
|
|
const { uploadComplete, assetData, uploadLength, contentLength, version } = dto;
|
2025-10-12 19:18:52 -04:00
|
|
|
const isComplete = uploadComplete !== false;
|
2025-10-12 19:33:22 -04:00
|
|
|
const isResumable = version && uploadComplete !== undefined;
|
2025-10-10 19:26:22 -04:00
|
|
|
const { backup } = await this.getConfig({ withCache: true });
|
2025-09-24 13:56:46 -04:00
|
|
|
|
2025-10-06 19:47:27 -04:00
|
|
|
const asset = await this.onStart(auth, dto);
|
|
|
|
|
if (asset.isDuplicate) {
|
|
|
|
|
if (asset.status !== AssetStatus.Partial) {
|
2025-10-06 23:32:20 -04:00
|
|
|
return this.sendAlreadyCompleted(res);
|
2025-10-02 15:15:32 -04:00
|
|
|
}
|
|
|
|
|
|
2025-10-06 19:47:27 -04:00
|
|
|
const location = `/api/upload/${asset.id}`;
|
2025-10-12 19:18:52 -04:00
|
|
|
if (isResumable) {
|
2025-10-10 19:26:22 -04:00
|
|
|
this.sendInterimResponse(res, location, version, this.getUploadLimits(backup));
|
2025-10-12 19:18:52 -04:00
|
|
|
// this is a 5xx to indicate the client should do offset retrieval and resume
|
|
|
|
|
res.status(500).send('Incomplete asset already exists');
|
|
|
|
|
return;
|
2025-10-02 15:15:32 -04:00
|
|
|
}
|
2025-09-24 13:56:46 -04:00
|
|
|
}
|
2025-09-28 18:37:16 -04:00
|
|
|
|
2025-10-12 19:18:52 -04:00
|
|
|
if (isComplete && uploadLength !== contentLength) {
|
2025-10-06 23:32:20 -04:00
|
|
|
return this.sendInconsistentLength(res);
|
2025-10-01 23:57:02 -04:00
|
|
|
}
|
|
|
|
|
|
2025-10-06 19:47:27 -04:00
|
|
|
const location = `/api/upload/${asset.id}`;
|
2025-10-12 19:18:52 -04:00
|
|
|
if (isResumable) {
|
2025-10-10 19:26:22 -04:00
|
|
|
this.sendInterimResponse(res, location, version, this.getUploadLimits(backup));
|
2025-09-29 04:31:47 -04:00
|
|
|
}
|
2025-09-28 18:37:16 -04:00
|
|
|
|
2025-10-08 00:02:36 -04:00
|
|
|
this.addRequest(asset.id, req);
|
2025-10-10 19:26:22 -04:00
|
|
|
await this.databaseRepository.withUuidLock(asset.id, async () => {
|
2025-10-12 19:18:52 -04:00
|
|
|
// conventional upload, check status again with lock acquired before overwriting
|
|
|
|
|
if (asset.isDuplicate) {
|
|
|
|
|
const existingAsset = await this.assetRepository.getCompletionMetadata(asset.id, auth.user.id);
|
|
|
|
|
if (existingAsset?.status !== AssetStatus.Partial) {
|
|
|
|
|
return this.sendAlreadyCompleted(res);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
await this.storageRepository.mkdir(dirname(asset.path));
|
|
|
|
|
|
2025-10-10 19:26:22 -04:00
|
|
|
let checksumBuffer: Buffer | undefined;
|
2025-10-12 19:18:52 -04:00
|
|
|
const writeStream = asset.isDuplicate
|
|
|
|
|
? this.storageRepository.createWriteStream(asset.path)
|
|
|
|
|
: this.storageRepository.createOrAppendWriteStream(asset.path);
|
|
|
|
|
this.pipe(req, writeStream, contentLength);
|
|
|
|
|
if (isComplete) {
|
2025-10-10 19:26:22 -04:00
|
|
|
const hash = createHash('sha1');
|
|
|
|
|
req.on('data', (data: Buffer) => hash.update(data));
|
|
|
|
|
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
|
|
|
|
|
}
|
|
|
|
|
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
|
2025-10-12 19:18:52 -04:00
|
|
|
if (isResumable) {
|
|
|
|
|
this.setCompleteHeader(res, version, uploadComplete);
|
|
|
|
|
}
|
|
|
|
|
if (!isComplete) {
|
2025-10-10 19:26:22 -04:00
|
|
|
res.status(201).set('Location', location).setHeader('Upload-Limit', this.getUploadLimits(backup)).send();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (dto.checksum.compare(checksumBuffer!) !== 0) {
|
|
|
|
|
return await this.sendChecksumMismatch(res, asset.id, asset.path);
|
|
|
|
|
}
|
2025-09-24 13:56:46 -04:00
|
|
|
|
2025-10-10 19:26:22 -04:00
|
|
|
await this.onComplete({ id: asset.id, path: asset.path, fileModifiedAt: assetData.fileModifiedAt });
|
|
|
|
|
res.status(200).send({ id: asset.id });
|
|
|
|
|
});
|
2025-09-24 13:56:46 -04:00
|
|
|
}
|
|
|
|
|
|
2025-10-03 01:24:38 -04:00
|
|
|
resumeUpload(auth: AuthDto, req: Readable, res: Response, id: string, dto: ResumeUploadDto): Promise<void> {
|
2025-09-29 18:09:06 -04:00
|
|
|
this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`);
|
2025-10-10 20:59:53 -04:00
|
|
|
const { uploadComplete, uploadLength, uploadOffset, contentLength, version } = dto;
|
2025-10-03 01:24:38 -04:00
|
|
|
this.setCompleteHeader(res, version, false);
|
2025-10-08 00:02:36 -04:00
|
|
|
this.addRequest(id, req);
|
2025-09-29 18:09:06 -04:00
|
|
|
return this.databaseRepository.withUuidLock(id, async () => {
|
2025-10-03 01:24:38 -04:00
|
|
|
const completionData = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
|
2025-10-01 14:41:21 -04:00
|
|
|
if (!completionData) {
|
2025-10-01 11:57:29 -04:00
|
|
|
res.status(404).send('Asset not found');
|
2025-09-24 13:56:46 -04:00
|
|
|
return;
|
|
|
|
|
}
|
2025-10-01 14:41:21 -04:00
|
|
|
const { fileModifiedAt, path, status, checksum: providedChecksum, size } = completionData;
|
2025-09-24 13:56:46 -04:00
|
|
|
|
2025-10-01 14:41:21 -04:00
|
|
|
if (status !== AssetStatus.Partial) {
|
2025-10-06 23:32:20 -04:00
|
|
|
return this.sendAlreadyCompleted(res);
|
2025-09-28 18:37:16 -04:00
|
|
|
}
|
2025-10-01 14:41:21 -04:00
|
|
|
|
|
|
|
|
if (uploadLength && size && size !== uploadLength) {
|
2025-10-06 23:32:20 -04:00
|
|
|
return this.sendInconsistentLength(res);
|
2025-09-24 13:56:46 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const expectedOffset = await this.getCurrentOffset(path);
|
2025-09-29 18:09:06 -04:00
|
|
|
if (expectedOffset !== uploadOffset) {
|
2025-10-06 23:32:20 -04:00
|
|
|
return this.sendOffsetMismatch(res, expectedOffset, uploadOffset);
|
2025-09-24 13:56:46 -04:00
|
|
|
}
|
|
|
|
|
|
2025-09-29 18:09:06 -04:00
|
|
|
const newLength = uploadOffset + contentLength;
|
|
|
|
|
if (uploadLength !== undefined && newLength > uploadLength) {
|
2025-10-01 11:57:29 -04:00
|
|
|
res.status(400).send('Upload would exceed declared length');
|
2025-09-28 18:37:16 -04:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-10 20:59:53 -04:00
|
|
|
if (contentLength === 0 && !uploadComplete) {
|
2025-10-01 11:57:29 -04:00
|
|
|
res.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send();
|
2025-09-24 13:56:46 -04:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-12 19:18:52 -04:00
|
|
|
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
|
|
|
|
|
this.pipe(req, writeStream, contentLength);
|
2025-10-08 16:41:42 -04:00
|
|
|
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
|
2025-10-10 20:59:53 -04:00
|
|
|
this.setCompleteHeader(res, version, uploadComplete);
|
|
|
|
|
if (!uploadComplete) {
|
2025-10-06 23:32:20 -04:00
|
|
|
try {
|
|
|
|
|
const offset = await this.getCurrentOffset(path);
|
|
|
|
|
res.status(204).setHeader('Upload-Offset', offset.toString()).send();
|
|
|
|
|
} catch {
|
|
|
|
|
this.logger.error(`Failed to get current offset for ${path} after write`);
|
|
|
|
|
res.status(500).send();
|
2025-09-28 18:37:16 -04:00
|
|
|
}
|
2025-10-06 23:32:20 -04:00
|
|
|
return;
|
|
|
|
|
}
|
2025-09-28 18:37:16 -04:00
|
|
|
|
2025-10-06 23:32:20 -04:00
|
|
|
const checksum = await this.cryptoRepository.hashFile(path);
|
|
|
|
|
if (providedChecksum.compare(checksum) !== 0) {
|
|
|
|
|
return await this.sendChecksumMismatch(res, id, path);
|
|
|
|
|
}
|
2025-09-28 18:37:16 -04:00
|
|
|
|
2025-10-08 00:02:36 -04:00
|
|
|
await this.onComplete({ id, path, fileModifiedAt });
|
2025-10-06 23:32:20 -04:00
|
|
|
res.status(200).send({ id });
|
2025-10-01 14:41:21 -04:00
|
|
|
});
|
|
|
|
|
}
|
2025-09-24 13:56:46 -04:00
|
|
|
|
2025-10-03 01:24:38 -04:00
|
|
|
cancelUpload(auth: AuthDto, assetId: string, res: Response): Promise<void> {
|
2025-10-08 00:02:36 -04:00
|
|
|
this.abortExistingRequest(assetId);
|
2025-10-01 23:57:02 -04:00
|
|
|
return this.databaseRepository.withUuidLock(assetId, async () => {
|
|
|
|
|
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
|
|
|
|
|
if (!asset) {
|
2025-10-03 01:24:38 -04:00
|
|
|
res.status(404).send('Asset not found');
|
2025-10-01 23:57:02 -04:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (asset.status !== AssetStatus.Partial) {
|
2025-10-06 23:32:20 -04:00
|
|
|
return this.sendAlreadyCompleted(res);
|
2025-10-01 23:57:02 -04:00
|
|
|
}
|
|
|
|
|
await this.onCancel(assetId, asset.path);
|
2025-10-03 01:24:38 -04:00
|
|
|
res.status(204).send();
|
2025-10-01 23:57:02 -04:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
|
2025-10-03 01:24:38 -04:00
|
|
|
this.logger.verboseFn(() => `Getting upload status for ${id} with version ${version}`);
|
2025-10-10 19:26:22 -04:00
|
|
|
const { backup } = await this.getConfig({ withCache: true });
|
2025-10-08 00:02:36 -04:00
|
|
|
this.abortExistingRequest(id);
|
2025-10-01 23:57:02 -04:00
|
|
|
return this.databaseRepository.withUuidLock(id, async () => {
|
|
|
|
|
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
|
|
|
|
|
if (!asset) {
|
|
|
|
|
res.status(404).send('Asset not found');
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const offset = await this.getCurrentOffset(asset.path);
|
|
|
|
|
this.setCompleteHeader(res, version, asset.status !== AssetStatus.Partial);
|
|
|
|
|
res
|
|
|
|
|
.status(204)
|
|
|
|
|
.setHeader('Upload-Offset', offset.toString())
|
|
|
|
|
.setHeader('Cache-Control', 'no-store')
|
2025-10-10 19:26:22 -04:00
|
|
|
.setHeader('Upload-Limit', this.getUploadLimits(backup))
|
2025-10-01 23:57:02 -04:00
|
|
|
.send();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-10 19:26:22 -04:00
|
|
|
async getUploadOptions(res: Response): Promise<void> {
|
|
|
|
|
const { backup } = await this.getConfig({ withCache: true });
|
|
|
|
|
res.status(204).setHeader('Upload-Limit', this.getUploadLimits(backup)).send();
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-06 19:47:27 -04:00
|
|
|
@OnJob({ name: JobName.PartialAssetCleanupQueueAll, queue: QueueName.BackgroundTask })
|
2025-10-02 18:34:49 -04:00
|
|
|
async removeStaleUploads(): Promise<void> {
|
2025-10-09 16:38:21 -04:00
|
|
|
const config = await this.getConfig({ withCache: false });
|
2025-10-10 19:26:22 -04:00
|
|
|
const createdBefore = DateTime.now().minus({ hours: config.backup.upload.maxAgeHours }).toJSDate();
|
2025-10-02 18:34:49 -04:00
|
|
|
let jobs: JobItem[] = [];
|
|
|
|
|
const assets = this.assetJobRepository.streamForPartialAssetCleanupJob(createdBefore);
|
|
|
|
|
for await (const asset of assets) {
|
2025-10-06 19:47:27 -04:00
|
|
|
jobs.push({ name: JobName.PartialAssetCleanup, data: asset });
|
2025-10-02 18:34:49 -04:00
|
|
|
if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
|
|
|
|
|
await this.jobRepository.queueAll(jobs);
|
|
|
|
|
jobs = [];
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
await this.jobRepository.queueAll(jobs);
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-06 19:47:27 -04:00
|
|
|
@OnJob({ name: JobName.PartialAssetCleanup, queue: QueueName.BackgroundTask })
|
|
|
|
|
removeStaleUpload({ id }: JobOf<JobName.PartialAssetCleanup>): Promise<JobStatus> {
|
2025-10-02 18:34:49 -04:00
|
|
|
return this.databaseRepository.withUuidLock(id, async () => {
|
|
|
|
|
const asset = await this.assetJobRepository.getForPartialAssetCleanupJob(id);
|
|
|
|
|
if (!asset) {
|
|
|
|
|
return JobStatus.Skipped;
|
|
|
|
|
}
|
|
|
|
|
const { checksum, fileModifiedAt, path, size } = asset;
|
|
|
|
|
try {
|
|
|
|
|
const stat = await this.storageRepository.stat(path);
|
|
|
|
|
if (size === stat.size && checksum === (await this.cryptoRepository.hashFile(path))) {
|
|
|
|
|
await this.onComplete({ id, path, fileModifiedAt });
|
|
|
|
|
return JobStatus.Success;
|
|
|
|
|
}
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
this.logger.debugFn(() => `Failed to check upload file ${path}: ${error.message}`);
|
|
|
|
|
}
|
|
|
|
|
await this.onCancel(id, path);
|
|
|
|
|
return JobStatus.Success;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-06 19:47:27 -04:00
|
|
|
async onStart(
|
|
|
|
|
auth: AuthDto,
|
|
|
|
|
{ assetData, checksum, uploadLength }: StartUploadDto,
|
|
|
|
|
): Promise<{ id: string; path: string; status: AssetStatus; isDuplicate: boolean }> {
|
|
|
|
|
const assetId = this.cryptoRepository.randomUUID();
|
|
|
|
|
const folder = StorageCore.getNestedFolder(StorageFolder.Upload, auth.user.id, assetId);
|
|
|
|
|
const extension = extname(assetData.filename);
|
|
|
|
|
const path = join(folder, `${assetId}${extension}`);
|
|
|
|
|
const type = mimeTypes.assetType(path);
|
|
|
|
|
|
|
|
|
|
if (type === AssetType.Other) {
|
|
|
|
|
throw new BadRequestException(`${assetData.filename} is an unsupported file type`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.validateQuota(auth, uploadLength);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
await this.assetRepository.createWithMetadata(
|
|
|
|
|
{
|
|
|
|
|
id: assetId,
|
|
|
|
|
ownerId: auth.user.id,
|
|
|
|
|
libraryId: null,
|
|
|
|
|
checksum,
|
|
|
|
|
originalPath: path,
|
|
|
|
|
deviceAssetId: assetData.deviceAssetId,
|
|
|
|
|
deviceId: assetData.deviceId,
|
|
|
|
|
fileCreatedAt: assetData.fileCreatedAt,
|
|
|
|
|
fileModifiedAt: assetData.fileModifiedAt,
|
|
|
|
|
localDateTime: assetData.fileCreatedAt,
|
2025-10-06 23:32:20 -04:00
|
|
|
type,
|
2025-10-06 19:47:27 -04:00
|
|
|
isFavorite: assetData.isFavorite,
|
2025-10-09 16:12:47 -04:00
|
|
|
livePhotoVideoId: assetData.livePhotoVideoId,
|
2025-10-06 19:47:27 -04:00
|
|
|
visibility: AssetVisibility.Hidden,
|
|
|
|
|
originalFileName: assetData.filename,
|
|
|
|
|
status: AssetStatus.Partial,
|
|
|
|
|
},
|
|
|
|
|
uploadLength,
|
|
|
|
|
assetData.iCloudId ? [{ key: AssetMetadataKey.MobileApp, value: { iCloudId: assetData.iCloudId } }] : undefined,
|
|
|
|
|
);
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
if (!isAssetChecksumConstraint(error)) {
|
|
|
|
|
this.logger.error(`Error creating upload asset record: ${error.message}`);
|
|
|
|
|
throw new InternalServerErrorException('Error creating asset');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, checksum);
|
|
|
|
|
if (!duplicate) {
|
|
|
|
|
throw new InternalServerErrorException('Error locating duplicate for checksum constraint');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return { id: duplicate.id, path, status: duplicate.status, isDuplicate: true };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return { id: assetId, path, status: AssetStatus.Partial, isDuplicate: false };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) {
|
2025-10-09 19:55:18 -04:00
|
|
|
this.logger.log('Completing upload for asset', id);
|
2025-10-06 23:32:20 -04:00
|
|
|
const jobData = { name: JobName.AssetExtractMetadata, data: { id, source: 'upload' } } as const;
|
2025-10-06 19:47:27 -04:00
|
|
|
await withRetry(() => this.assetRepository.setComplete(id));
|
|
|
|
|
try {
|
|
|
|
|
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
this.logger.error(`Failed to update times for ${path}: ${error.message}`);
|
|
|
|
|
}
|
|
|
|
|
await withRetry(() => this.jobRepository.queue(jobData));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async onCancel(assetId: string, path: string): Promise<void> {
|
2025-10-09 19:55:18 -04:00
|
|
|
this.logger.log('Cancelling upload for asset', assetId);
|
2025-10-06 19:47:27 -04:00
|
|
|
await withRetry(() => this.storageRepository.unlink(path));
|
|
|
|
|
await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId));
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-08 00:02:36 -04:00
|
|
|
private addRequest(assetId: string, req: Readable) {
|
|
|
|
|
const addTime = new Date();
|
|
|
|
|
const activeRequest = { req, startTime: addTime };
|
|
|
|
|
this.abortExistingRequest(assetId, addTime);
|
|
|
|
|
this.activeRequests.set(assetId, activeRequest);
|
|
|
|
|
req.on('close', () => {
|
|
|
|
|
if (this.activeRequests.get(assetId)?.req === req) {
|
|
|
|
|
this.activeRequests.delete(assetId);
|
2025-10-01 14:41:21 -04:00
|
|
|
}
|
2025-09-24 13:56:46 -04:00
|
|
|
});
|
2025-10-08 00:02:36 -04:00
|
|
|
}
|
2025-10-01 14:41:21 -04:00
|
|
|
|
2025-10-08 00:02:36 -04:00
|
|
|
private abortExistingRequest(assetId: string, abortTime = new Date()) {
|
|
|
|
|
const abortEvent = { assetId, abortTime };
|
|
|
|
|
// only emit if we didn't just abort it ourselves
|
|
|
|
|
if (!this.onUploadAbort(abortEvent)) {
|
|
|
|
|
this.eventRepository.serverSend('UploadAbort', abortEvent);
|
|
|
|
|
}
|
2025-09-24 13:56:46 -04:00
|
|
|
}
|
|
|
|
|
|
2025-10-12 19:18:52 -04:00
|
|
|
private pipe(req: Readable, writeStream: Writable, size: number) {
|
2025-10-08 16:41:42 -04:00
|
|
|
let receivedLength = 0;
|
|
|
|
|
req.on('data', (data: Buffer) => {
|
|
|
|
|
receivedLength += data.length;
|
|
|
|
|
if (!writeStream.write(data)) {
|
|
|
|
|
req.pause();
|
|
|
|
|
writeStream.once('drain', () => req.resume());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
req.on('close', () => {
|
|
|
|
|
if (receivedLength < size) {
|
|
|
|
|
writeStream.emit('error', new Error('Request closed before all data received'));
|
|
|
|
|
}
|
|
|
|
|
writeStream.end();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-10 19:26:22 -04:00
|
|
|
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number, limits: string): void {
|
2025-09-28 18:37:16 -04:00
|
|
|
if (socket && !socket.destroyed) {
|
|
|
|
|
// Express doesn't understand interim responses, so write directly to socket
|
|
|
|
|
socket.write(
|
2025-09-29 04:31:47 -04:00
|
|
|
'HTTP/1.1 104 Upload Resumption Supported\r\n' +
|
2025-09-28 18:37:16 -04:00
|
|
|
`Location: ${location}\r\n` +
|
2025-10-10 19:26:22 -04:00
|
|
|
`Upload-Limit: ${limits}\r\n` +
|
2025-09-29 04:31:47 -04:00
|
|
|
`Upload-Draft-Interop-Version: ${interopVersion}\r\n\r\n`,
|
2025-09-28 18:37:16 -04:00
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-06 23:32:20 -04:00
|
|
|
private sendInconsistentLength(res: Response): void {
|
2025-10-01 11:57:29 -04:00
|
|
|
res.status(400).contentType('application/problem+json').send({
|
2025-09-29 18:09:06 -04:00
|
|
|
type: 'https://iana.org/assignments/http-problem-types#inconsistent-upload-length',
|
2025-09-28 18:37:16 -04:00
|
|
|
title: 'inconsistent length values for upload',
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-06 23:32:20 -04:00
|
|
|
private sendAlreadyCompleted(res: Response): void {
|
2025-10-01 11:57:29 -04:00
|
|
|
res.status(400).contentType('application/problem+json').send({
|
2025-09-29 18:09:06 -04:00
|
|
|
type: 'https://iana.org/assignments/http-problem-types#completed-upload',
|
2025-09-28 18:37:16 -04:00
|
|
|
title: 'upload is already completed',
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-06 23:32:20 -04:00
|
|
|
private sendOffsetMismatch(res: Response, expected: number, actual: number): void {
|
2025-10-01 11:57:29 -04:00
|
|
|
res.status(409).contentType('application/problem+json').setHeader('Upload-Offset', expected.toString()).send({
|
2025-09-29 04:31:47 -04:00
|
|
|
type: 'https://iana.org/assignments/http-problem-types#mismatching-upload-offset',
|
|
|
|
|
title: 'offset from request does not match offset of resource',
|
|
|
|
|
'expected-offset': expected,
|
|
|
|
|
'provided-offset': actual,
|
|
|
|
|
});
|
2025-09-28 18:37:16 -04:00
|
|
|
}
|
|
|
|
|
|
2025-10-06 23:32:20 -04:00
|
|
|
private sendChecksumMismatch(res: Response, assetId: string, path: string) {
|
2025-09-28 18:37:16 -04:00
|
|
|
this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`);
|
2025-10-01 23:57:02 -04:00
|
|
|
res.status(460).send('File on server does not match provided checksum');
|
2025-09-29 18:09:06 -04:00
|
|
|
return this.onCancel(assetId, path);
|
2025-09-24 13:56:46 -04:00
|
|
|
}
|
|
|
|
|
|
2025-10-01 11:57:29 -04:00
|
|
|
private validateQuota(auth: AuthDto, size: number): void {
|
2025-10-08 00:02:36 -04:00
|
|
|
const { quotaSizeInBytes: quotaLimit, quotaUsageInBytes: currentUsage } = auth.user;
|
|
|
|
|
if (quotaLimit === null) {
|
2025-09-24 13:56:46 -04:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-08 00:02:36 -04:00
|
|
|
if (quotaLimit < currentUsage + size) {
|
2025-09-24 13:56:46 -04:00
|
|
|
throw new BadRequestException('Quota has been exceeded!');
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async getCurrentOffset(path: string): Promise<number> {
|
|
|
|
|
try {
|
|
|
|
|
const stat = await this.storageRepository.stat(path);
|
|
|
|
|
return stat.size;
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
if ((error as NodeJS.ErrnoException)?.code === 'ENOENT') {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-12 19:18:52 -04:00
|
|
|
private setCompleteHeader(res: Response, interopVersion: number | undefined, isComplete: boolean): void {
|
|
|
|
|
if (interopVersion === undefined || interopVersion > 3) {
|
2025-10-01 11:57:29 -04:00
|
|
|
res.setHeader('Upload-Complete', isComplete ? '?1' : '?0');
|
2025-09-29 04:31:47 -04:00
|
|
|
} else {
|
2025-10-01 11:57:29 -04:00
|
|
|
res.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1');
|
2025-09-29 04:31:47 -04:00
|
|
|
}
|
|
|
|
|
}
|
2025-10-10 19:26:22 -04:00
|
|
|
|
|
|
|
|
private getUploadLimits({ upload }: SystemConfig['backup']) {
|
2025-10-10 21:08:16 -04:00
|
|
|
return `min-size=1, max-age=${upload.maxAgeHours * 3600}`;
|
2025-10-10 19:26:22 -04:00
|
|
|
}
|
2025-09-24 13:56:46 -04:00
|
|
|
}
|