From 7f50f268a52d67fb7d4b4c3d94b22e2aee3d2d66 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Wed, 24 Sep 2025 13:56:46 -0400 Subject: [PATCH] chunked upload controller --- pnpm-lock.yaml | 9 + server/Dockerfile | 1 + server/package.json | 1 + server/src/app.module.ts | 3 +- .../controllers/asset-upload.controller.ts | 42 ++ server/src/controllers/index.ts | 2 + server/src/dtos/upload.dto.ts | 133 ++++++ server/src/enum.ts | 3 + server/src/repositories/asset.repository.ts | 31 +- .../src/repositories/database.repository.ts | 24 + server/src/repositories/storage.repository.ts | 29 +- .../1759089915352-AddPartialAssetStatus.ts | 9 + server/src/services/asset-media.service.ts | 12 +- server/src/services/asset-upload.service.ts | 450 ++++++++++++++++++ server/src/services/index.ts | 2 + 15 files changed, 734 insertions(+), 17 deletions(-) create mode 100644 server/src/controllers/asset-upload.controller.ts create mode 100644 server/src/dtos/upload.dto.ts create mode 100644 server/src/schema/migrations/1759089915352-AddPartialAssetStatus.ts create mode 100644 server/src/services/asset-upload.service.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3ef694e5fa..5628dca37b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -511,6 +511,9 @@ importers: socket.io: specifier: ^4.8.1 version: 4.8.1 + structured-headers: + specifier: ^2.0.2 + version: 2.0.2 tailwindcss-preset-email: specifier: ^1.4.0 version: 1.4.0(tailwindcss@3.4.18(yaml@2.8.1)) @@ -10318,6 +10321,10 @@ packages: resolution: {integrity: sha512-KIy5nylvC5le1OdaaoCJ07L+8iQzJHGH6pWDuzS+d07Cu7n1MZ2x26P8ZKIWfbK02+XIL8Mp4RkWeqdUCrDMfg==} engines: {node: '>=18'} + structured-headers@2.0.2: + resolution: {integrity: sha512-IUul56vVHuMg2UxWhwDj9zVJE6ztYEQQkynr1FQ/NydPhivtk5+Qb2N1RS36owEFk2fNUriTguJ2R7htRObcdA==} + engines: {node: '>=18', npm: '>=6'} + style-to-js@1.1.17: resolution: {integrity: sha512-xQcBGDxJb6jjFCTzvQtfiPn6YvvP2O8U1MDIPNfJQlWMYfktPy+iGsHE7cssjs7y84d9fQaK4UF3RIJaAHSoYA==} @@ -23195,6 +23202,8 @@ snapshots: dependencies: '@tokenizer/token': 0.3.0 + structured-headers@2.0.2: {} + style-to-js@1.1.17: dependencies: style-to-object: 1.0.9 diff --git a/server/Dockerfile b/server/Dockerfile index 2ff62f9ab4..a18c930710 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -55,6 +55,7 @@ ENV NODE_ENV=production \ NVIDIA_DRIVER_CAPABILITIES=all \ NVIDIA_VISIBLE_DEVICES=all +COPY --from=builder /usr/bin/tusd /usr/bin/tusd COPY --from=server /output/server-pruned ./server COPY --from=web /usr/src/app/web/build /build/www COPY --from=cli /output/cli-pruned ./cli diff --git a/server/package.json b/server/package.json index 293e43c893..da898e8d82 100644 --- a/server/package.json +++ b/server/package.json @@ -104,6 +104,7 @@ "sharp": "^0.34.4", "sirv": "^3.0.0", "socket.io": "^4.8.1", + "structured-headers": "^2.0.2", "tailwindcss-preset-email": "^1.4.0", "thumbhash": "^0.1.1", "ua-parser-js": "^2.0.0", diff --git a/server/src/app.module.ts b/server/src/app.module.ts index a1cd1edfdf..641f550cee 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -29,7 +29,6 @@ import { getKyselyConfig } from 'src/utils/database'; const common = [...repositories, ...services, GlobalExceptionFilter]; export const middleware = [ - FileUploadInterceptor, { provide: APP_FILTER, useClass: GlobalExceptionFilter }, { provide: APP_PIPE, useValue: new ValidationPipe({ transform: true, whitelist: true }) }, { provide: APP_INTERCEPTOR, useClass: LoggingInterceptor }, @@ -87,7 +86,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy { @Module({ imports: [...imports, ScheduleModule.forRoot()], controllers: [...controllers], - providers: [...common, ...middleware, { provide: IWorker, useValue: ImmichWorker.Api }], + providers: [...common, ...middleware, FileUploadInterceptor, { provide: IWorker, useValue: ImmichWorker.Api }], }) export class ApiModule extends BaseModule {} diff --git a/server/src/controllers/asset-upload.controller.ts b/server/src/controllers/asset-upload.controller.ts new file mode 100644 index 0000000000..763bba510b --- /dev/null +++ b/server/src/controllers/asset-upload.controller.ts @@ -0,0 +1,42 @@ +import { Controller, Head, Param, Patch, Post, Req, Res } from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { Request, Response } from 'express'; +import { AuthDto } from 'src/dtos/auth.dto'; +import { Permission } from 'src/enum'; +import { Auth, Authenticated } from 'src/middleware/auth.guard'; +import { AssetUploadService } from 'src/services/asset-upload.service'; +import { UUIDParamDto } from 'src/validation'; + +@ApiTags('Upload') +@Controller('upload') +export class AssetUploadController { + constructor(private service: AssetUploadService) {} + + @Post('asset') + @Authenticated({ sharedLink: true, permission: Permission.AssetUpload }) + handleInitialChunk(@Auth() auth: AuthDto, @Req() request: Request, @Res() response: Response): Promise { + return this.service.handleInitialChunk(auth, request, response); + } + + @Patch('asset/:id') + @Authenticated({ sharedLink: true, permission: Permission.AssetUpload }) + handleRemainingChunks( + @Auth() auth: AuthDto, + @Param() { id }: UUIDParamDto, + @Req() request: Request, + @Res() response: Response, + ): Promise { + return this.service.handleRemainingChunks(auth, id, request, response); + } + + @Head('asset/:id') + @Authenticated({ sharedLink: true, permission: Permission.AssetUpload }) + getUploadStatus( + @Auth() auth: AuthDto, + @Param() { id }: UUIDParamDto, + @Req() request: Request, + @Res() response: Response, + ): Promise { + return this.service.getUploadStatus(auth, id, request, response); + } +} diff --git a/server/src/controllers/index.ts b/server/src/controllers/index.ts index e3661ec794..18280c45ec 100644 --- a/server/src/controllers/index.ts +++ b/server/src/controllers/index.ts @@ -3,6 +3,7 @@ import { AlbumController } from 'src/controllers/album.controller'; import { ApiKeyController } from 'src/controllers/api-key.controller'; import { AppController } from 'src/controllers/app.controller'; import { AssetMediaController } from 'src/controllers/asset-media.controller'; +import { AssetUploadController } from 'src/controllers/asset-upload.controller'; import { AssetController } from 'src/controllers/asset.controller'; import { AuthAdminController } from 'src/controllers/auth-admin.controller'; import { AuthController } from 'src/controllers/auth.controller'; @@ -40,6 +41,7 @@ export const controllers = [ AppController, AssetController, AssetMediaController, + AssetUploadController, AuthController, AuthAdminController, DownloadController, diff --git a/server/src/dtos/upload.dto.ts b/server/src/dtos/upload.dto.ts new file mode 100644 index 0000000000..d90ceeeb18 --- /dev/null +++ b/server/src/dtos/upload.dto.ts @@ -0,0 +1,133 @@ +import { Type } from 'class-transformer'; +import { IsEnum, IsInt, IsNotEmpty, IsObject, IsString, IsUUID, ValidateNested } from 'class-validator'; +import { AssetMediaCreateDto } from 'src/dtos/asset-media.dto'; + +export enum TusdHookRequestType { + PreCreate = 'pre-create', + PreFinish = 'pre-finish', +} + +export enum TusdHookStorageType { + FileStore = 'filestore', +} + +export class TusdStorageDto { + @IsEnum(TusdHookStorageType) + Type!: string; + + @IsString() + @IsNotEmpty() + Path!: string; + + @IsString() + @IsNotEmpty() + InfoPath!: string; +} + +export class UploadAssetDataDto extends AssetMediaCreateDto { + @IsString() + @IsNotEmpty() + declare filename: string; +} + +export class TusdMetaDataDto { + @IsString() + @IsNotEmpty() + declare AssetData: string; // base64-encoded JSON string of UploadAssetDataDto +} + +export class TusdPreCreateUploadDto { + @IsInt() + Size!: number; +} + +export class TusdPreFinishUploadDto { + @IsUUID() + ID!: string; + + @IsInt() + Size!: number; + + @Type(() => TusdMetaDataDto) + @ValidateNested() + @IsObject() + MetaData!: TusdMetaDataDto; + + @Type(() => TusdStorageDto) + @ValidateNested() + @IsObject() + Storage!: TusdStorageDto; +} + +export class TusdHttpRequestDto { + @IsString() + @IsNotEmpty() + Method!: string; + + @IsString() + @IsNotEmpty() + URI!: string; + + @IsObject() + Header!: Record; +} + +export class TusdPreCreateEventDto { + @Type(() => TusdPreCreateUploadDto) + @ValidateNested() + @IsObject() + Upload!: TusdPreCreateUploadDto; + + @Type(() => TusdHttpRequestDto) + @ValidateNested() + @IsObject() + HTTPRequest!: TusdHttpRequestDto; +} + +export class TusdPreFinishEventDto { + @Type(() => TusdPreFinishUploadDto) + @ValidateNested() + @IsObject() + Upload!: TusdPreFinishUploadDto; + + @Type(() => TusdHttpRequestDto) + @ValidateNested() + @IsObject() + HTTPRequest!: TusdHttpRequestDto; +} + +export class TusdHookRequestDto { + @IsEnum(TusdHookRequestType) + Type!: TusdHookRequestType; + + @IsObject() + Event!: TusdPreCreateEventDto | TusdPreFinishEventDto; +} + +export class TusdHttpResponseDto { + StatusCode!: number; + + Body?: string; + + Header?: Record; +} + +export class TusdChangeFileInfoStorageDto { + Path?: string; +} + +export class TusdChangeFileInfoDto { + ID?: string; + + MetaData?: TusdMetaDataDto; + + Storage?: TusdChangeFileInfoStorageDto; +} + +export class TusdHookResponseDto { + HTTPResponse?: TusdHttpResponseDto; + + RejectUpload?: boolean; + + ChangeFileInfo?: TusdChangeFileInfoDto; +} diff --git a/server/src/enum.ts b/server/src/enum.ts index 646138b060..37f3a34f86 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -20,6 +20,7 @@ export enum ImmichHeader { SharedLinkSlug = 'x-immich-share-slug', Checksum = 'x-immich-checksum', Cid = 'x-immich-cid', + AssetData = 'x-immich-asset-data', } export enum ImmichQuery { @@ -303,6 +304,7 @@ export enum AssetStatus { Active = 'active', Trashed = 'trashed', Deleted = 'deleted', + Partial = 'partial', } export enum SourceType { @@ -493,6 +495,7 @@ export enum BootstrapEventPriority { JobService = -190, // Initialise config after other bootstrap services, stop other services from using config on bootstrap SystemConfig = 100, + UploadService = 200, } export enum QueueName { diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index 5c3bd8996c..c5b0a414e7 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -253,6 +253,29 @@ export class AssetRepository { return this.db.insertInto('asset').values(asset).returningAll().executeTakeFirstOrThrow(); } + getCompletionMetadata(assetId: string, ownerId: string) { + return this.db + .selectFrom('asset') + .select(['originalPath as path', 'status', 'fileModifiedAt', 'createdAt', 'checksum']) + .where('id', '=', assetId) + .where('ownerId', '=', ownerId) + .executeTakeFirst(); + } + + setComplete(assetId: string, ownerId: string, size: number) { + return this.db + .with('exif', (qb) => qb.insertInto('asset_exif').values({ assetId, fileSizeInByte: size })) + .with('user', (qb) => + qb + .updateTable('user') + .set({ quotaUsageInBytes: sql`"quotaUsageInBytes" + ${size}` }) + .where('id', '=', ownerId), + ) + .updateTable('asset') + .set({ status: AssetStatus.Active }) + .execute(); + } + createAll(assets: Insertable[]) { return this.db.insertInto('asset').values(assets).returningAll().execute(); } @@ -492,17 +515,15 @@ export class AssetRepository { } @GenerateSql({ params: [DummyValue.UUID, DummyValue.BUFFER] }) - async getUploadAssetIdByChecksum(ownerId: string, checksum: Buffer): Promise { - const asset = await this.db + getUploadAssetIdByChecksum(ownerId: string, checksum: Buffer) { + return this.db .selectFrom('asset') - .select('id') + .select(['id', 'status', 'createdAt']) .where('ownerId', '=', asUuid(ownerId)) .where('checksum', '=', checksum) .where('libraryId', 'is', null) .limit(1) .executeTakeFirst(); - - return asset?.id; } findLivePhotoMatch(options: LivePhotoSearchOptions) { diff --git a/server/src/repositories/database.repository.ts b/server/src/repositories/database.repository.ts index e5d88339c8..d4a3069721 100644 --- a/server/src/repositories/database.repository.ts +++ b/server/src/repositories/database.repository.ts @@ -451,6 +451,22 @@ export class DatabaseRepository { return res as R; } + async withUuidLock(uuid: string, callback: () => Promise): Promise { + let res; + await this.asyncLock.acquire(uuid, async () => { + await this.db.connection().execute(async (connection) => { + try { + await this.acquireUuidLock(uuid, connection); + res = await callback(); + } finally { + await this.releaseUuidLock(uuid, connection); + } + }); + }); + + return res as R; + } + tryLock(lock: DatabaseLock): Promise { return this.db.connection().execute(async (connection) => this.acquireTryLock(lock, connection)); } @@ -467,6 +483,10 @@ export class DatabaseRepository { await sql`SELECT pg_advisory_lock(${lock})`.execute(connection); } + private async acquireUuidLock(uuid: string, connection: Kysely): Promise { + await sql`SELECT pg_advisory_lock(uuid_hash_extended(${uuid}), 0)`.execute(connection); + } + private async acquireTryLock(lock: DatabaseLock, connection: Kysely): Promise { const { rows } = await sql<{ pg_try_advisory_lock: boolean; @@ -477,4 +497,8 @@ export class DatabaseRepository { private async releaseLock(lock: DatabaseLock, connection: Kysely): Promise { await sql`SELECT pg_advisory_unlock(${lock})`.execute(connection); } + + private async releaseUuidLock(uuid: string, connection: Kysely): Promise { + await sql`SELECT pg_advisory_unlock(uuid_hash_extended(${uuid}), 0)`.execute(connection); + } } diff --git a/server/src/repositories/storage.repository.ts b/server/src/repositories/storage.repository.ts index 7d6b634845..9ece4fc722 100644 --- a/server/src/repositories/storage.repository.ts +++ b/server/src/repositories/storage.repository.ts @@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common'; import archiver from 'archiver'; import chokidar, { ChokidarOptions } from 'chokidar'; import { escapePath, glob, globStream } from 'fast-glob'; -import { constants, createReadStream, createWriteStream, existsSync, mkdirSync } from 'node:fs'; +import { constants, createReadStream, createWriteStream, existsSync, mkdirSync, unlinkSync } from 'node:fs'; import fs from 'node:fs/promises'; import path from 'node:path'; import { Readable, Writable } from 'node:stream'; @@ -65,6 +65,14 @@ export class StorageRepository { return createWriteStream(filepath, { flags: 'w' }); } + overwriteWriteStream(filepath: string, offset = 0): Writable { + return createWriteStream(filepath, { flags: 'r+', start: offset }); + } + + createOrAppendWriteStream(filepath: string): Writable { + return createWriteStream(filepath, { flags: 'a' }); + } + createOrOverwriteFile(filepath: string, buffer: Buffer) { return fs.writeFile(filepath, buffer, { flag: 'w' }); } @@ -134,6 +142,16 @@ export class StorageRepository { } } + unlinkSync(file: string) { + try { + unlinkSync(file); + } catch (error) { + if ((error as NodeJS.ErrnoException)?.code !== 'ENOENT') { + throw error; + } + } + } + async unlinkDir(folder: string, options: { recursive?: boolean; force?: boolean }) { await fs.rm(folder, options); } @@ -156,10 +174,13 @@ export class StorageRepository { } } + mkdir(filepath: string): Promise { + return fs.mkdir(filepath, { recursive: true }); + } + mkdirSync(filepath: string): void { - if (!existsSync(filepath)) { - mkdirSync(filepath, { recursive: true }); - } + // does not throw an error if the folder already exists + mkdirSync(filepath, { recursive: true }); } existsSync(filepath: string) { diff --git a/server/src/schema/migrations/1759089915352-AddPartialAssetStatus.ts b/server/src/schema/migrations/1759089915352-AddPartialAssetStatus.ts new file mode 100644 index 0000000000..1633aea177 --- /dev/null +++ b/server/src/schema/migrations/1759089915352-AddPartialAssetStatus.ts @@ -0,0 +1,9 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await sql`ALTER TYPE "assets_status_enum" ADD VALUE IF NOT EXISTS 'partial'`.execute(db); +} + +export async function down(db: Kysely): Promise { + // Cannot remove enum values in PostgreSQL +} diff --git a/server/src/services/asset-media.service.ts b/server/src/services/asset-media.service.ts index 0747bd7b7b..dd8fcce91c 100644 --- a/server/src/services/asset-media.service.ts +++ b/server/src/services/asset-media.service.ts @@ -43,12 +43,12 @@ export class AssetMediaService extends BaseService { return; } - const assetId = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, fromChecksum(checksum)); - if (!assetId) { + const asset = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, fromChecksum(checksum)); + if (!asset) { return; } - return { id: assetId, status: AssetMediaStatus.DUPLICATE }; + return { id: asset.id, status: AssetMediaStatus.DUPLICATE }; } canUploadFile({ auth, fieldName, file }: UploadRequest): true { @@ -313,12 +313,12 @@ export class AssetMediaService extends BaseService { // handle duplicates with a success response if (isAssetChecksumConstraint(error)) { - const duplicateId = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, file.checksum); - if (!duplicateId) { + const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, file.checksum); + if (!duplicate) { this.logger.error(`Error locating duplicate for checksum constraint`); throw new InternalServerErrorException(); } - return { status: AssetMediaStatus.DUPLICATE, id: duplicateId }; + return { status: AssetMediaStatus.DUPLICATE, id: duplicate.id }; } this.logger.error(`Error uploading file ${error}`, error?.stack); diff --git a/server/src/services/asset-upload.service.ts b/server/src/services/asset-upload.service.ts new file mode 100644 index 0000000000..7fdb6e2271 --- /dev/null +++ b/server/src/services/asset-upload.service.ts @@ -0,0 +1,450 @@ +import { BadRequestException, Injectable, InternalServerErrorException } from '@nestjs/common'; +import { plainToInstance } from 'class-transformer'; +import { validateSync } from 'class-validator'; +import { Request, Response } from 'express'; +import { createHash } from 'node:crypto'; +import { extname, join } from 'node:path'; +import { StorageCore } from 'src/cores/storage.core'; +import { AuthDto } from 'src/dtos/auth.dto'; +import { UploadAssetDataDto } from 'src/dtos/upload.dto'; +import { AssetStatus, AssetType, AssetVisibility, ImmichHeader, JobName, StorageFolder } from 'src/enum'; +import { BaseService } from 'src/services/base.service'; +import { isAssetChecksumConstraint } from 'src/utils/database'; +import { mimeTypes } from 'src/utils/mime-types'; +import { isInnerList, parseDictionary } from 'structured-headers'; + +@Injectable() +export class AssetUploadService extends BaseService { + async handleInitialChunk(auth: AuthDto, request: Request, response: Response): Promise { + const headers = request.headers; + const contentLength = this.getNumberOrThrow(headers, 'content-length'); + const isComplete = this.getIsCompleteOrThrow(headers); + const checksumHeader = this.getChecksumOrThrow(headers); + + const metadata = this.getAssetDataOrThrow(headers); + const assetId = this.cryptoRepository.randomUUID(); + const folder = StorageCore.getNestedFolder(StorageFolder.Upload, auth.user.id, assetId); + const extension = extname(metadata.filename); + const path = join(folder, `${assetId}${extension}`); + const type = mimeTypes.assetType(path); + if (type === AssetType.Other) { + throw new BadRequestException(`${metadata.filename} is an unsupported file type`); + } + this.requireQuota(auth, contentLength); + + try { + await this.assetRepository.create({ + id: assetId, + ownerId: auth.user.id, + libraryId: null, + checksum: checksumHeader, + originalPath: path, + deviceAssetId: metadata.deviceAssetId, + deviceId: metadata.deviceId, + fileCreatedAt: metadata.fileCreatedAt, + fileModifiedAt: metadata.fileModifiedAt, + localDateTime: metadata.fileCreatedAt, + type: mimeTypes.assetType(path), + isFavorite: metadata.isFavorite, + duration: metadata.duration || null, + visibility: metadata.visibility || AssetVisibility.Timeline, + originalFileName: metadata.filename, + status: AssetStatus.Partial, + }); + } catch (error: any) { + if (isAssetChecksumConstraint(error)) { + const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, checksumHeader); + if (!duplicate) { + throw new InternalServerErrorException('Error locating duplicate for checksum constraint'); + } + + if (duplicate.status === AssetStatus.Partial) { + response.status(201).setHeader('location', this.createLocation(headers, assetId)).send(); + } else { + response.status(400).contentType('application/problem+json').send({ + type: 'https://iana.org/assignments/http-problem-types#completed-upload', + title: 'upload is already completed', + }); + } + return; + } + this.logger.error(`Error creating upload asset record: ${error.message}`); + response.status(500).send('Error creating upload asset record'); + return; + } + await this.storageRepository.mkdir(folder); + let checksumBuffer: Buffer | undefined; + const writeStream = this.storageRepository.createWriteStream(path); + + if (isComplete) { + const hash = createHash('sha1'); + request.on('data', (chunk: Buffer) => hash.update(chunk)); + writeStream.on('finish', () => (checksumBuffer = hash.digest())); + } + + writeStream.on('error', (error) => { + this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); + if (!response.headersSent) { + return response.status(500).setHeader('location', this.createLocation(headers, assetId)).send(); + } + }); + + writeStream.on('finish', () => { + if (!isComplete) { + return response.status(201).setHeader('location', this.createLocation(headers, assetId)).send(); + } + + this.logger.log(`Finished upload to ${path}`); + this.assertChecksum(checksumHeader, checksumBuffer!, path, assetId); + response.status(201).setHeader('Upload-Complete', '?1').send(); + void this.onCompletion({ + assetId, + ownerId: auth.user.id, + path, + size: contentLength, + fileModifiedAt: metadata.fileModifiedAt, + }); + }); + + request.on('error', (error) => { + this.logger.error(`Failed to read request body: ${error.message}`); + writeStream.end(); + if (!response.headersSent) { + return response.status(500).setHeader('location', this.createLocation(headers, assetId)).send(); + } + }); + + let receivedLength = 0; + request.on('data', (chunk: Buffer) => { + if (receivedLength + chunk.length > contentLength) { + writeStream.destroy(); + request.destroy(); + this.onPermanentFailure(assetId, path); + response.status(400).send('Received more data than specified in content-length'); + } + receivedLength += chunk.length; + if (!writeStream.write(chunk)) { + request.pause(); + writeStream.once('drain', () => request.resume()); + } + }); + + request.on('end', () => { + if (receivedLength !== contentLength) { + this.logger.error(`Received ${receivedLength} bytes when expecting ${contentLength} for ${assetId}`); + writeStream.destroy(); + this.onPermanentFailure(assetId, path); + } + }); + } + + async handleRemainingChunks(auth: AuthDto, assetId: string, request: Request, response: Response): Promise { + const headers = request.headers; + const headerIsComplete = this.getIsCompleteOrThrow(headers); + const contentLength = this.getNumberOrThrow(headers, 'content-length'); + await this.databaseRepository.withUuidLock(assetId, async () => { + const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id); + if (!asset) { + response.status(404).send('Asset not found'); + return; + } + + const { path } = asset; + if (asset.status !== AssetStatus.Partial) { + response.status(400).contentType('application/problem+json').send({ + type: 'https://iana.org/assignments/http-problem-types#completed-upload', + title: 'upload is already completed', + }); + return; + } + + const providedOffset = this.getNumber(headers, 'upload-offset') ?? 0; + const expectedOffset = await this.getCurrentOffset(path); + + if (expectedOffset !== providedOffset) { + response.status(409).contentType('application/problem+json').setHeader('upload-complete', '?0').send({ + type: 'https://iana.org/assignments/http-problem-types#mismatching-upload-offset', + title: 'offset from request does not match offset of resource', + 'expected-offset': expectedOffset, + 'provided-offset': providedOffset, + }); + } + + const newLength = providedOffset + contentLength; + this.requireQuota(auth, newLength); + + if (contentLength === 0) { + response.status(204).send(); + return; + } + + const writeStream = this.storageRepository.createOrAppendWriteStream(path); + writeStream.on('error', (error) => { + this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); + if (!response.headersSent) { + response.status(500).send('Failed to write chunk'); + } + }); + + writeStream.on('finish', async () => { + if (headerIsComplete) { + this.logger.log(`Finished upload to ${path}`); + const checksum = await this.cryptoRepository.hashFile(path); + this.assertChecksum(asset.checksum, checksum, path, assetId); + response.status(201).setHeader('upload-complete', '?1').send(); + await this.onCompletion({ + assetId, + ownerId: auth.user.id, + path, + size: newLength, + fileModifiedAt: asset.fileModifiedAt, + }); + } else { + response.status(204).send(); + } + }); + + let receivedLength = 0; + request.on('data', (chunk: Buffer) => { + if (receivedLength + chunk.length > contentLength) { + this.logger.error(`Received more data than specified in content-length for upload to ${path}`); + writeStream.destroy(new Error('Received more data than specified in content-length')); + request.destroy(); + void this.onPermanentFailure(assetId, path); + return; + } + + receivedLength += chunk.length; + if (!writeStream.write(chunk)) { + request.pause(); + writeStream.once('drain', () => request.resume()); + } + }); + + request.on('end', () => { + if (receivedLength < contentLength) { + this.logger.error(`Received less data than specified in content-length for upload to ${path}`); + writeStream.destroy(new Error('Received less data than specified in content-length')); + void this.onPermanentFailure(assetId, path); + return; + } + writeStream.end(); + }); + }); + } + + getUploadStatus(auth: AuthDto, assetId: string, request: Request, response: Response) { + const headers = request.headers; + const interopVersion = this.getInteropVersion(headers); + return this.databaseRepository.withUuidLock(assetId, async () => { + const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id); + if (!asset) { + response.status(404).send('Asset not found'); + return; + } + + if (interopVersion !== null && interopVersion < 2) { + response.setHeader('upload-incomplete', asset.status === AssetStatus.Partial ? '?1' : '?0'); + } else { + response.setHeader('upload-complete', asset.status === AssetStatus.Partial ? '?0' : '?1'); + } + + response + .status(204) + .setHeader('upload-offset', await this.getCurrentOffset(asset.path)) + .send(); + }); + } + + private async onCompletion(data: { + assetId: string; + ownerId: string; + path: string; + size: number; + fileModifiedAt: Date; + }): Promise { + const { assetId, ownerId, path, size, fileModifiedAt } = data; + const jobData = { name: JobName.AssetExtractMetadata, data: { id: assetId, source: 'upload' } } as const; + await this.withRetry(() => this.assetRepository.setComplete(assetId, ownerId, size), 2); + await this.withRetry(() => this.jobRepository.queue(jobData), 2); + await this.withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt), 2); + } + + private async onPermanentFailure(assetId: string, path: string): Promise { + await this.withRetry(() => this.storageRepository.unlink(path), 2); + await this.withRetry(() => this.assetRepository.remove({ id: assetId }), 2); + } + + private async withRetry(operation: () => Promise, retries: number): Promise { + let lastError: any; + for (let attempt = 0; attempt <= retries; attempt++) { + try { + return await operation(); + } catch (error: any) { + lastError = error; + } + } + throw lastError; + } + + private async tryUnlink(path: string): Promise { + try { + await this.storageRepository.unlink(path); + } catch { + this.logger.warn(`Failed to remove file at ${path}`); + } + } + + private requireQuota(auth: AuthDto, size: number) { + if (auth.user.quotaSizeInBytes === null) { + return; + } + + if (auth.user.quotaSizeInBytes < auth.user.quotaUsageInBytes + size) { + throw new BadRequestException('Quota has been exceeded!'); + } + } + + private async getCurrentOffset(path: string): Promise { + try { + const stat = await this.storageRepository.stat(path); + return stat.size; + } catch (error: any) { + if ((error as NodeJS.ErrnoException)?.code === 'ENOENT') { + return 0; + } + throw error; + } + } + + private getNumberOrThrow(headers: Request['headers'], header: string): number { + const value = this.getNumber(headers, header); + if (value === null) { + throw new BadRequestException(`Missing ${header} header`); + } + return value; + } + + private getNumber(headers: Request['headers'], header: string): number | null { + const value = headers[header] as string | undefined; + if (value === undefined) { + return null; + } + + const parsedValue = parseInt(value); + if (!isFinite(parsedValue) || parsedValue < 0) { + throw new BadRequestException(`Invalid ${header} header`); + } + return parsedValue; + } + + private getChecksumOrThrow(headers: Request['headers']): Buffer { + const value = headers['repr-digest'] as string | undefined; + if (value === undefined) { + throw new BadRequestException(`Missing 'repr-digest' header`); + } + + const sha1Item = parseDictionary(value).get('sha'); + if (!sha1Item) { + throw new BadRequestException(`Missing 'sha' in 'repr-digest' header`); + } + + if (isInnerList(sha1Item)) { + throw new BadRequestException(`Invalid 'sha' in 'repr-digest' header`); + } + const checksum = sha1Item[0]; + if (!(checksum instanceof ArrayBuffer)) { + throw new BadRequestException(`Invalid 'sha' in 'repr-digest' header`); + } + + return Buffer.from(checksum); + } + + private getIsCompleteOrThrow(headers: Request['headers']): boolean { + const isComplete = headers['upload-complete'] as string | undefined; + if (isComplete !== undefined) { + return isComplete === '?1'; + } + + // old drafts use this header + const isIncomplete = headers['upload-incomplete'] as string | undefined; + if (isIncomplete !== undefined) { + return isIncomplete === '?0'; + } + + throw new BadRequestException(`Missing 'upload-complete' header`); + } + + private getAssetDataOrThrow(headers: Request['headers']): UploadAssetDataDto { + const value = headers[ImmichHeader.AssetData] as string | undefined; + if (value === undefined) { + throw new BadRequestException(`Missing ${ImmichHeader.AssetData} header`); + } + + let assetData: any; + try { + assetData = JSON.parse(Buffer.from(value, 'base64').toString('utf8')); + } catch { + throw new BadRequestException(`${ImmichHeader.AssetData} header is not valid base64-encoded JSON`); + } + const dto = plainToInstance(UploadAssetDataDto, assetData); + const assetDataErrors = validateSync(dto, { whitelist: true }); + if (assetDataErrors.length > 0) { + const formatted = assetDataErrors.map((e) => (e.constraints ? Object.values(e.constraints).join(', ') : '')); + throw new BadRequestException(`Invalid ${ImmichHeader.AssetData} header: ${formatted.join('; ')}`); + } + + if (!mimeTypes.isAsset(dto.filename)) { + throw new BadRequestException(`${dto.filename} is an unsupported file type`); + } + return dto; + } + + private getInteropVersion(headers: Request['headers']): number | null { + const value = headers['upload-draft-interop-version'] as string | undefined; + if (value === undefined) { + return null; + } + + const parsedValue = parseInt(value); + if (!isFinite(parsedValue) || parsedValue < 0) { + throw new BadRequestException(`Invalid Upload-Draft-Interop-Version header`); + } + return parsedValue; + } + + private createLocation(headers: Request['headers'], assetId: string): string { + const forwardedProto = headers['x-forwarded-proto'] ?? 'http'; + return `${forwardedProto}://${this.getForwardedHost(headers)}/api/upload/asset/${assetId}`; + } + + private assertChecksum(checksum1: Buffer, checksum2: Buffer, assetId: string, path: string): void { + if (checksum1.compare(checksum2) !== 0) { + this.logger.warn(`Checksum mismatch for upload to ${path}`); + void this.onPermanentFailure(assetId, path); + throw new BadRequestException('Checksum mismatch'); + } + } + + private getForwardedHost(headers: Request['headers']): string | undefined { + const forwardedHost = headers['x-forwarded-host']; + if (typeof forwardedHost === 'string') { + return forwardedHost; + } + + const forwarded = headers['forwarded'] as string | undefined; + if (forwarded) { + const parts = parseDictionary(forwarded); + const hostItem = parts.get('host'); + if (hostItem && !isInnerList(hostItem)) { + const item = hostItem[0]; + if (typeof item === 'string') { + return item; + } + } + } + + const { host, port } = this.configRepository.getEnv(); + return `${host ?? 'localhost'}:${port}`; + } +} diff --git a/server/src/services/index.ts b/server/src/services/index.ts index cad38ca1f4..6771baf4a3 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -3,6 +3,7 @@ import { AlbumService } from 'src/services/album.service'; import { ApiKeyService } from 'src/services/api-key.service'; import { ApiService } from 'src/services/api.service'; import { AssetMediaService } from 'src/services/asset-media.service'; +import { AssetUploadService } from 'src/services/asset-upload.service'; import { AssetService } from 'src/services/asset.service'; import { AuditService } from 'src/services/audit.service'; import { AuthAdminService } from 'src/services/auth-admin.service'; @@ -47,6 +48,7 @@ export const services = [ AlbumService, ApiService, AssetMediaService, + AssetUploadService, AssetService, AuditService, AuthService,