clean up stale uploads

stale upload cleanup

try/catch file check
This commit is contained in:
mertalev 2025-10-02 18:34:49 -04:00
parent 071dbc1c50
commit 0105c9e2b6
No known key found for this signature in database
GPG key ID: DF6ABC77AAD98C95
7 changed files with 102 additions and 14 deletions

View file

@ -2,8 +2,8 @@ import { BadRequestException } from '@nestjs/common';
import { Expose, plainToInstance, Transform, Type } from 'class-transformer';
import { Equals, IsArray, IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateIf, ValidateNested } from 'class-validator';
import { AssetMetadataUpsertItemDto } from 'src/dtos/asset.dto';
import { AssetVisibility, ImmichHeader } from 'src/enum';
import { Optional, ValidateBoolean, ValidateDate, ValidateEnum, ValidateUUID } from 'src/validation';
import { ImmichHeader } from 'src/enum';
import { Optional, ValidateBoolean, ValidateDate } from 'src/validation';
import { parseDictionary } from 'structured-headers';
export class UploadAssetDataDto {
@ -32,12 +32,6 @@ export class UploadAssetDataDto {
@ValidateBoolean({ optional: true })
isFavorite?: boolean;
@ValidateEnum({ enum: AssetVisibility, name: 'AssetVisibility', optional: true })
visibility?: AssetVisibility;
@ValidateUUID({ optional: true })
livePhotoVideoId?: string;
@Transform(({ value }) => {
try {
const json = JSON.parse(value);

View file

@ -531,6 +531,8 @@ export enum JobName {
AssetFileMigration = 'AssetFileMigration',
AssetGenerateThumbnailsQueueAll = 'AssetGenerateThumbnailsQueueAll',
AssetGenerateThumbnails = 'AssetGenerateThumbnails',
PartialAssetDelete = 'PartialAssetCleanup',
PartialAssetDeleteQueueAll = 'PartialAssetCleanupQueueAll',
AuditLogCleanup = 'AuditLogCleanup',
AuditTableCleanup = 'AuditTableCleanup',

View file

@ -1,10 +1,10 @@
import { Injectable } from '@nestjs/common';
import { Kysely } from 'kysely';
import { Kysely, sql } from 'kysely';
import { jsonArrayFrom } from 'kysely/helpers/postgres';
import { InjectKysely } from 'nestjs-kysely';
import { Asset, columns } from 'src/database';
import { DummyValue, GenerateSql } from 'src/decorators';
import { AssetFileType, AssetType, AssetVisibility } from 'src/enum';
import { AssetFileType, AssetStatus, AssetType, AssetVisibility } from 'src/enum';
import { DB } from 'src/schema';
import { StorageAsset } from 'src/types';
import {
@ -28,6 +28,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.where('asset.id', '=', asUuid(id))
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.leftJoin('smart_search', 'asset.id', 'smart_search.assetId')
.select(['id', 'type', 'ownerId', 'duplicateId', 'stackId', 'visibility', 'smart_search.embedding'])
.limit(1)
@ -39,6 +40,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.where('asset.id', '=', asUuid(id))
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.select(['id', 'sidecarPath', 'originalPath'])
.select((eb) =>
jsonArrayFrom(
@ -58,6 +60,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.where('asset.id', '=', asUuid(id))
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.select(['id', 'sidecarPath', 'originalPath'])
.limit(1)
.executeTakeFirst();
@ -69,6 +72,7 @@ export class AssetJobRepository {
.selectFrom('asset')
.select(['asset.id', 'asset.thumbhash'])
.select(withFiles)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.where('asset.visibility', '!=', AssetVisibility.Hidden)
.$if(!force, (qb) =>
@ -93,6 +97,7 @@ export class AssetJobRepository {
.select(['asset.id', 'asset.ownerId', 'asset.encodedVideoPath'])
.select(withFiles)
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@ -112,6 +117,7 @@ export class AssetJobRepository {
.select(withFiles)
.$call(withExifInner)
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@ -122,6 +128,7 @@ export class AssetJobRepository {
.select(columns.asset)
.select(withFaces)
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@ -139,6 +146,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.where('asset.visibility', '!=', AssetVisibility.Hidden)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.innerJoin('asset_job_status as job_status', 'assetId', 'asset.id')
.where('job_status.previewAt', 'is not', null);
@ -149,6 +157,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.select(['asset.id'])
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.innerJoin('smart_search', 'asset.id', 'smart_search.assetId')
.$call(withDefaultVisibility)
@ -177,6 +186,7 @@ export class AssetJobRepository {
.select(['asset.id', 'asset.visibility'])
.select((eb) => withFiles(eb, AssetFileType.Preview))
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@ -189,6 +199,7 @@ export class AssetJobRepository {
.select((eb) => withFaces(eb, true))
.select((eb) => withFiles(eb, AssetFileType.Preview))
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@ -241,6 +252,7 @@ export class AssetJobRepository {
)
.select((eb) => toJson(eb, 'stacked_assets').as('stack'))
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@ -255,6 +267,7 @@ export class AssetJobRepository {
.where((eb) => eb.or([eb('asset.encodedVideoPath', 'is', null), eb('asset.encodedVideoPath', '=', '')]))
.where('asset.visibility', '!=', AssetVisibility.Hidden),
)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.stream();
}
@ -266,6 +279,7 @@ export class AssetJobRepository {
.select(['asset.id', 'asset.ownerId', 'asset.originalPath', 'asset.encodedVideoPath'])
.where('asset.id', '=', id)
.where('asset.type', '=', AssetType.Video)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@ -281,6 +295,7 @@ export class AssetJobRepository {
eb.or([eb('asset_job_status.metadataExtractedAt', 'is', null), eb('asset_job_status.assetId', 'is', null)]),
),
)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.stream();
}
@ -303,6 +318,7 @@ export class AssetJobRepository {
'asset_exif.timeZone',
'asset_exif.fileSizeInByte',
])
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null);
}
@ -324,6 +340,7 @@ export class AssetJobRepository {
.selectFrom('asset')
.select(['id', 'isOffline'])
.where('asset.deletedAt', '<=', trashedBefore)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.stream();
}
@ -336,6 +353,7 @@ export class AssetJobRepository {
qb.where((eb) => eb.or([eb('asset.sidecarPath', '=', ''), eb('asset.sidecarPath', 'is', null)])),
)
.where('asset.visibility', '!=', AssetVisibility.Hidden)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.stream();
}
@ -344,12 +362,38 @@ export class AssetJobRepository {
return this.assetsWithPreviews()
.$if(force === false, (qb) => qb.where('job_status.facesRecognizedAt', 'is', null))
.select(['asset.id'])
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.orderBy('asset.fileCreatedAt', 'desc')
.stream();
}
@GenerateSql({ params: [DummyValue.DATE], stream: true })
streamForMigrationJob() {
return this.db.selectFrom('asset').select(['id']).where('asset.deletedAt', 'is', null).stream();
return this.db
.selectFrom('asset')
.select(['id'])
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.stream();
}
getForPartialAssetCleanupJob(assetId: string) {
return this.db
.selectFrom('asset')
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.select(['originalPath as path', 'fileSizeInByte as size', 'checksum', 'fileModifiedAt'])
.where('id', '=', assetId)
.where('status', '=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@GenerateSql({ params: [DummyValue.DATE], stream: true })
streamForPartialAssetCleanupJob(createdBefore: Date) {
return this.db
.selectFrom('asset')
.select(['id'])
.where('asset.status', '=', sql.lit(AssetStatus.Partial))
.where('asset.createdAt', '<', createdBefore)
.stream();
}
}

View file

@ -290,7 +290,7 @@ export class AssetRepository {
setComplete(assetId: string) {
return this.db
.updateTable('asset')
.set({ status: AssetStatus.Active })
.set({ status: AssetStatus.Active, visibility: AssetVisibility.Timeline })
.where('id', '=', assetId)
.where('status', '=', sql.lit(AssetStatus.Partial))
.execute();

View file

@ -165,6 +165,10 @@ export class AssetMediaService extends BaseService {
throw new Error('Asset not found');
}
if (asset.status === AssetStatus.Partial) {
throw new BadRequestException('Cannot replace a partial asset');
}
this.requireQuota(auth, file.size);
await this.replaceFileData(asset.id, dto, file, sidecarFile?.originalPath);

View file

@ -1,14 +1,18 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { Response } from 'express';
import { DateTime } from 'luxon';
import { createHash } from 'node:crypto';
import { extname, join } from 'node:path';
import { Readable } from 'node:stream';
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core';
import { OnJob } from 'src/decorators';
import { AuthDto } from 'src/dtos/auth.dto';
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/upload.dto';
import { AssetStatus, AssetType, AssetVisibility, JobName, StorageFolder } from 'src/enum';
import { AssetStatus, AssetType, AssetVisibility, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum';
import { AuthenticatedRequest } from 'src/middleware/auth.guard';
import { BaseService } from 'src/services/base.service';
import { JobItem, JobOf } from 'src/types';
import { isAssetChecksumConstraint } from 'src/utils/database';
import { mimeTypes } from 'src/utils/mime-types';
import { withRetry } from 'src/utils/misc';
@ -49,7 +53,7 @@ export class AssetUploadService extends BaseService {
type: type,
isFavorite: assetData.isFavorite,
duration: assetData.duration || null,
visibility: assetData.visibility || AssetVisibility.Timeline,
visibility: AssetVisibility.Hidden,
originalFileName: assetData.filename,
status: AssetStatus.Partial,
},
@ -222,6 +226,44 @@ export class AssetUploadService extends BaseService {
});
}
@OnJob({ name: JobName.PartialAssetDeleteQueueAll, queue: QueueName.BackgroundTask })
async removeStaleUploads(): Promise<void> {
// TODO: make this configurable
const createdBefore = DateTime.now().minus({ days: 7 }).toJSDate();
let jobs: JobItem[] = [];
const assets = this.assetJobRepository.streamForPartialAssetCleanupJob(createdBefore);
for await (const asset of assets) {
jobs.push({ name: JobName.AssetFileMigration, data: asset });
if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(jobs);
jobs = [];
}
}
await this.jobRepository.queueAll(jobs);
}
@OnJob({ name: JobName.PartialAssetDelete, queue: QueueName.BackgroundTask })
removeStaleUpload({ id }: JobOf<JobName.PartialAssetDelete>): Promise<JobStatus> {
return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetJobRepository.getForPartialAssetCleanupJob(id);
if (!asset) {
return JobStatus.Skipped;
}
const { checksum, fileModifiedAt, path, size } = asset;
try {
const stat = await this.storageRepository.stat(path);
if (size === stat.size && checksum === (await this.cryptoRepository.hashFile(path))) {
await this.onComplete({ id, path, fileModifiedAt });
return JobStatus.Success;
}
} catch (error: any) {
this.logger.debugFn(() => `Failed to check upload file ${path}: ${error.message}`);
}
await this.onCancel(id, path);
return JobStatus.Success;
});
}
private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) {
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
writeStream.on('error', (error) => {

View file

@ -352,6 +352,8 @@ export type JobItem =
| { name: JobName.PersonCleanup; data?: IBaseJob }
| { name: JobName.AssetDelete; data: IAssetDeleteJob }
| { name: JobName.AssetDeleteCheck; data?: IBaseJob }
| { name: JobName.PartialAssetDelete; data: IEntityJob }
| { name: JobName.PartialAssetDeleteQueueAll; data: IBaseJob }
// Library Management
| { name: JobName.LibrarySyncFiles; data: ILibraryFileJob }