From 0105c9e2b68b3f6aaa65e247a33c8f4192da273a Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Thu, 2 Oct 2025 18:34:49 -0400 Subject: [PATCH] clean up stale uploads stale upload cleanup try/catch file check --- server/src/dtos/upload.dto.ts | 10 +--- server/src/enum.ts | 2 + .../src/repositories/asset-job.repository.ts | 50 +++++++++++++++++-- server/src/repositories/asset.repository.ts | 2 +- server/src/services/asset-media.service.ts | 4 ++ server/src/services/asset-upload.service.ts | 46 ++++++++++++++++- server/src/types.ts | 2 + 7 files changed, 102 insertions(+), 14 deletions(-) diff --git a/server/src/dtos/upload.dto.ts b/server/src/dtos/upload.dto.ts index 1f70afff85..959334d3aa 100644 --- a/server/src/dtos/upload.dto.ts +++ b/server/src/dtos/upload.dto.ts @@ -2,8 +2,8 @@ import { BadRequestException } from '@nestjs/common'; import { Expose, plainToInstance, Transform, Type } from 'class-transformer'; import { Equals, IsArray, IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateIf, ValidateNested } from 'class-validator'; import { AssetMetadataUpsertItemDto } from 'src/dtos/asset.dto'; -import { AssetVisibility, ImmichHeader } from 'src/enum'; -import { Optional, ValidateBoolean, ValidateDate, ValidateEnum, ValidateUUID } from 'src/validation'; +import { ImmichHeader } from 'src/enum'; +import { Optional, ValidateBoolean, ValidateDate } from 'src/validation'; import { parseDictionary } from 'structured-headers'; export class UploadAssetDataDto { @@ -32,12 +32,6 @@ export class UploadAssetDataDto { @ValidateBoolean({ optional: true }) isFavorite?: boolean; - @ValidateEnum({ enum: AssetVisibility, name: 'AssetVisibility', optional: true }) - visibility?: AssetVisibility; - - @ValidateUUID({ optional: true }) - livePhotoVideoId?: string; - @Transform(({ value }) => { try { const json = JSON.parse(value); diff --git a/server/src/enum.ts b/server/src/enum.ts index 37f3a34f86..4b7c1ab3c1 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -531,6 +531,8 @@ export enum JobName { AssetFileMigration = 'AssetFileMigration', AssetGenerateThumbnailsQueueAll = 'AssetGenerateThumbnailsQueueAll', AssetGenerateThumbnails = 'AssetGenerateThumbnails', + PartialAssetDelete = 'PartialAssetCleanup', + PartialAssetDeleteQueueAll = 'PartialAssetCleanupQueueAll', AuditLogCleanup = 'AuditLogCleanup', AuditTableCleanup = 'AuditTableCleanup', diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index ca1291b852..de97bc2c4c 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -1,10 +1,10 @@ import { Injectable } from '@nestjs/common'; -import { Kysely } from 'kysely'; +import { Kysely, sql } from 'kysely'; import { jsonArrayFrom } from 'kysely/helpers/postgres'; import { InjectKysely } from 'nestjs-kysely'; import { Asset, columns } from 'src/database'; import { DummyValue, GenerateSql } from 'src/decorators'; -import { AssetFileType, AssetType, AssetVisibility } from 'src/enum'; +import { AssetFileType, AssetStatus, AssetType, AssetVisibility } from 'src/enum'; import { DB } from 'src/schema'; import { StorageAsset } from 'src/types'; import { @@ -28,6 +28,7 @@ export class AssetJobRepository { return this.db .selectFrom('asset') .where('asset.id', '=', asUuid(id)) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .leftJoin('smart_search', 'asset.id', 'smart_search.assetId') .select(['id', 'type', 'ownerId', 'duplicateId', 'stackId', 'visibility', 'smart_search.embedding']) .limit(1) @@ -39,6 +40,7 @@ export class AssetJobRepository { return this.db .selectFrom('asset') .where('asset.id', '=', asUuid(id)) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .select(['id', 'sidecarPath', 'originalPath']) .select((eb) => jsonArrayFrom( @@ -58,6 +60,7 @@ export class AssetJobRepository { return this.db .selectFrom('asset') .where('asset.id', '=', asUuid(id)) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .select(['id', 'sidecarPath', 'originalPath']) .limit(1) .executeTakeFirst(); @@ -69,6 +72,7 @@ export class AssetJobRepository { .selectFrom('asset') .select(['asset.id', 'asset.thumbhash']) .select(withFiles) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .where('asset.deletedAt', 'is', null) .where('asset.visibility', '!=', AssetVisibility.Hidden) .$if(!force, (qb) => @@ -93,6 +97,7 @@ export class AssetJobRepository { .select(['asset.id', 'asset.ownerId', 'asset.encodedVideoPath']) .select(withFiles) .where('asset.id', '=', id) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .executeTakeFirst(); } @@ -112,6 +117,7 @@ export class AssetJobRepository { .select(withFiles) .$call(withExifInner) .where('asset.id', '=', id) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .executeTakeFirst(); } @@ -122,6 +128,7 @@ export class AssetJobRepository { .select(columns.asset) .select(withFaces) .where('asset.id', '=', id) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .executeTakeFirst(); } @@ -139,6 +146,7 @@ export class AssetJobRepository { return this.db .selectFrom('asset') .where('asset.visibility', '!=', AssetVisibility.Hidden) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .where('asset.deletedAt', 'is', null) .innerJoin('asset_job_status as job_status', 'assetId', 'asset.id') .where('job_status.previewAt', 'is not', null); @@ -149,6 +157,7 @@ export class AssetJobRepository { return this.db .selectFrom('asset') .select(['asset.id']) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .where('asset.deletedAt', 'is', null) .innerJoin('smart_search', 'asset.id', 'smart_search.assetId') .$call(withDefaultVisibility) @@ -177,6 +186,7 @@ export class AssetJobRepository { .select(['asset.id', 'asset.visibility']) .select((eb) => withFiles(eb, AssetFileType.Preview)) .where('asset.id', '=', id) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .executeTakeFirst(); } @@ -189,6 +199,7 @@ export class AssetJobRepository { .select((eb) => withFaces(eb, true)) .select((eb) => withFiles(eb, AssetFileType.Preview)) .where('asset.id', '=', id) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .executeTakeFirst(); } @@ -241,6 +252,7 @@ export class AssetJobRepository { ) .select((eb) => toJson(eb, 'stacked_assets').as('stack')) .where('asset.id', '=', id) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .executeTakeFirst(); } @@ -255,6 +267,7 @@ export class AssetJobRepository { .where((eb) => eb.or([eb('asset.encodedVideoPath', 'is', null), eb('asset.encodedVideoPath', '=', '')])) .where('asset.visibility', '!=', AssetVisibility.Hidden), ) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .where('asset.deletedAt', 'is', null) .stream(); } @@ -266,6 +279,7 @@ export class AssetJobRepository { .select(['asset.id', 'asset.ownerId', 'asset.originalPath', 'asset.encodedVideoPath']) .where('asset.id', '=', id) .where('asset.type', '=', AssetType.Video) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .executeTakeFirst(); } @@ -281,6 +295,7 @@ export class AssetJobRepository { eb.or([eb('asset_job_status.metadataExtractedAt', 'is', null), eb('asset_job_status.assetId', 'is', null)]), ), ) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .where('asset.deletedAt', 'is', null) .stream(); } @@ -303,6 +318,7 @@ export class AssetJobRepository { 'asset_exif.timeZone', 'asset_exif.fileSizeInByte', ]) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .where('asset.deletedAt', 'is', null); } @@ -324,6 +340,7 @@ export class AssetJobRepository { .selectFrom('asset') .select(['id', 'isOffline']) .where('asset.deletedAt', '<=', trashedBefore) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .stream(); } @@ -336,6 +353,7 @@ export class AssetJobRepository { qb.where((eb) => eb.or([eb('asset.sidecarPath', '=', ''), eb('asset.sidecarPath', 'is', null)])), ) .where('asset.visibility', '!=', AssetVisibility.Hidden) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .stream(); } @@ -344,12 +362,38 @@ export class AssetJobRepository { return this.assetsWithPreviews() .$if(force === false, (qb) => qb.where('job_status.facesRecognizedAt', 'is', null)) .select(['asset.id']) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) .orderBy('asset.fileCreatedAt', 'desc') .stream(); } @GenerateSql({ params: [DummyValue.DATE], stream: true }) streamForMigrationJob() { - return this.db.selectFrom('asset').select(['id']).where('asset.deletedAt', 'is', null).stream(); + return this.db + .selectFrom('asset') + .select(['id']) + .where('asset.status', '!=', sql.lit(AssetStatus.Partial)) + .where('asset.deletedAt', 'is', null) + .stream(); + } + + getForPartialAssetCleanupJob(assetId: string) { + return this.db + .selectFrom('asset') + .innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId') + .select(['originalPath as path', 'fileSizeInByte as size', 'checksum', 'fileModifiedAt']) + .where('id', '=', assetId) + .where('status', '=', sql.lit(AssetStatus.Partial)) + .executeTakeFirst(); + } + + @GenerateSql({ params: [DummyValue.DATE], stream: true }) + streamForPartialAssetCleanupJob(createdBefore: Date) { + return this.db + .selectFrom('asset') + .select(['id']) + .where('asset.status', '=', sql.lit(AssetStatus.Partial)) + .where('asset.createdAt', '<', createdBefore) + .stream(); } } diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index b59f70a259..3a25383fa1 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -290,7 +290,7 @@ export class AssetRepository { setComplete(assetId: string) { return this.db .updateTable('asset') - .set({ status: AssetStatus.Active }) + .set({ status: AssetStatus.Active, visibility: AssetVisibility.Timeline }) .where('id', '=', assetId) .where('status', '=', sql.lit(AssetStatus.Partial)) .execute(); diff --git a/server/src/services/asset-media.service.ts b/server/src/services/asset-media.service.ts index dd8fcce91c..39ba1f2b89 100644 --- a/server/src/services/asset-media.service.ts +++ b/server/src/services/asset-media.service.ts @@ -165,6 +165,10 @@ export class AssetMediaService extends BaseService { throw new Error('Asset not found'); } + if (asset.status === AssetStatus.Partial) { + throw new BadRequestException('Cannot replace a partial asset'); + } + this.requireQuota(auth, file.size); await this.replaceFileData(asset.id, dto, file, sidecarFile?.originalPath); diff --git a/server/src/services/asset-upload.service.ts b/server/src/services/asset-upload.service.ts index 9c6d8d5cf6..7ad648d88e 100644 --- a/server/src/services/asset-upload.service.ts +++ b/server/src/services/asset-upload.service.ts @@ -1,14 +1,18 @@ import { BadRequestException, Injectable } from '@nestjs/common'; import { Response } from 'express'; +import { DateTime } from 'luxon'; import { createHash } from 'node:crypto'; import { extname, join } from 'node:path'; import { Readable } from 'node:stream'; +import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; +import { OnJob } from 'src/decorators'; import { AuthDto } from 'src/dtos/auth.dto'; import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/upload.dto'; -import { AssetStatus, AssetType, AssetVisibility, JobName, StorageFolder } from 'src/enum'; +import { AssetStatus, AssetType, AssetVisibility, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum'; import { AuthenticatedRequest } from 'src/middleware/auth.guard'; import { BaseService } from 'src/services/base.service'; +import { JobItem, JobOf } from 'src/types'; import { isAssetChecksumConstraint } from 'src/utils/database'; import { mimeTypes } from 'src/utils/mime-types'; import { withRetry } from 'src/utils/misc'; @@ -49,7 +53,7 @@ export class AssetUploadService extends BaseService { type: type, isFavorite: assetData.isFavorite, duration: assetData.duration || null, - visibility: assetData.visibility || AssetVisibility.Timeline, + visibility: AssetVisibility.Hidden, originalFileName: assetData.filename, status: AssetStatus.Partial, }, @@ -222,6 +226,44 @@ export class AssetUploadService extends BaseService { }); } + @OnJob({ name: JobName.PartialAssetDeleteQueueAll, queue: QueueName.BackgroundTask }) + async removeStaleUploads(): Promise { + // TODO: make this configurable + const createdBefore = DateTime.now().minus({ days: 7 }).toJSDate(); + let jobs: JobItem[] = []; + const assets = this.assetJobRepository.streamForPartialAssetCleanupJob(createdBefore); + for await (const asset of assets) { + jobs.push({ name: JobName.AssetFileMigration, data: asset }); + if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) { + await this.jobRepository.queueAll(jobs); + jobs = []; + } + } + await this.jobRepository.queueAll(jobs); + } + + @OnJob({ name: JobName.PartialAssetDelete, queue: QueueName.BackgroundTask }) + removeStaleUpload({ id }: JobOf): Promise { + return this.databaseRepository.withUuidLock(id, async () => { + const asset = await this.assetJobRepository.getForPartialAssetCleanupJob(id); + if (!asset) { + return JobStatus.Skipped; + } + const { checksum, fileModifiedAt, path, size } = asset; + try { + const stat = await this.storageRepository.stat(path); + if (size === stat.size && checksum === (await this.cryptoRepository.hashFile(path))) { + await this.onComplete({ id, path, fileModifiedAt }); + return JobStatus.Success; + } + } catch (error: any) { + this.logger.debugFn(() => `Failed to check upload file ${path}: ${error.message}`); + } + await this.onCancel(id, path); + return JobStatus.Success; + }); + } + private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) { const writeStream = this.storageRepository.createOrAppendWriteStream(path); writeStream.on('error', (error) => { diff --git a/server/src/types.ts b/server/src/types.ts index da3889ef7c..121581c8be 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -352,6 +352,8 @@ export type JobItem = | { name: JobName.PersonCleanup; data?: IBaseJob } | { name: JobName.AssetDelete; data: IAssetDeleteJob } | { name: JobName.AssetDeleteCheck; data?: IBaseJob } + | { name: JobName.PartialAssetDelete; data: IEntityJob } + | { name: JobName.PartialAssetDeleteQueueAll; data: IBaseJob } // Library Management | { name: JobName.LibrarySyncFiles; data: ILibraryFileJob }