diff --git a/server/src/enum.ts b/server/src/enum.ts index 4b7c1ab3c1..6edef45f31 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -531,8 +531,8 @@ export enum JobName { AssetFileMigration = 'AssetFileMigration', AssetGenerateThumbnailsQueueAll = 'AssetGenerateThumbnailsQueueAll', AssetGenerateThumbnails = 'AssetGenerateThumbnails', - PartialAssetDelete = 'PartialAssetCleanup', - PartialAssetDeleteQueueAll = 'PartialAssetCleanupQueueAll', + PartialAssetCleanup = 'PartialAssetCleanup', + PartialAssetCleanupQueueAll = 'PartialAssetCleanupQueueAll', AuditLogCleanup = 'AuditLogCleanup', AuditTableCleanup = 'AuditTableCleanup', diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index 3a25383fa1..5ec8d3a86a 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -287,8 +287,8 @@ export class AssetRepository { .executeTakeFirst(); } - setComplete(assetId: string) { - return this.db + async setComplete(assetId: string) { + await this.db .updateTable('asset') .set({ status: AssetStatus.Active, visibility: AssetVisibility.Timeline }) .where('id', '=', assetId) diff --git a/server/src/services/asset-upload.service.spec.ts b/server/src/services/asset-upload.service.spec.ts new file mode 100644 index 0000000000..ee2d8e50d0 --- /dev/null +++ b/server/src/services/asset-upload.service.spec.ts @@ -0,0 +1,479 @@ +import { BadRequestException, InternalServerErrorException } from '@nestjs/common'; +import { StructuredBoolean } from 'src/dtos/upload.dto'; +import { AssetMetadataKey, AssetStatus, AssetType, AssetVisibility, JobName, JobStatus } from 'src/enum'; +import { AssetUploadService } from 'src/services/asset-upload.service'; +import { ASSET_CHECKSUM_CONSTRAINT } from 'src/utils/database'; +import { authStub } from 'test/fixtures/auth.stub'; +import { factory } from 'test/small.factory'; +import { newTestService, ServiceMocks } from 'test/utils'; + +describe(AssetUploadService.name, () => { + let sut: AssetUploadService; + let mocks: ServiceMocks; + + beforeEach(() => { + ({ sut, mocks } = newTestService(AssetUploadService)); + }); + + describe('onStart', () => { + const mockDto = { + assetData: { + filename: 'test.jpg', + deviceAssetId: 'device-asset-1', + deviceId: 'device-1', + fileCreatedAt: new Date('2025-01-01T00:00:00Z'), + fileModifiedAt: new Date('2025-01-01T12:00:00Z'), + isFavorite: false, + iCloudId: '' + }, + checksum: Buffer.from('checksum'), + uploadLength: 1024, + uploadComplete: StructuredBoolean.True, + uploadIncomplete: StructuredBoolean.False, + contentLength: 1024, + isComplete: true, + version: 8, + }; + + it('should create a new asset and return upload metadata', async () => { + const assetId = factory.uuid(); + mocks.crypto.randomUUID.mockReturnValue(assetId); + + const result = await sut.onStart(authStub.user1, mockDto); + + expect(result).toEqual({ + id: assetId, + path: expect.stringContaining(assetId), + status: AssetStatus.Partial, + isDuplicate: false, + }); + + expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith( + expect.objectContaining({ + id: assetId, + ownerId: authStub.user1.user.id, + checksum: mockDto.checksum, + deviceAssetId: mockDto.assetData.deviceAssetId, + deviceId: mockDto.assetData.deviceId, + fileCreatedAt: mockDto.assetData.fileCreatedAt, + fileModifiedAt: mockDto.assetData.fileModifiedAt, + type: AssetType.Image, + isFavorite: false, + status: AssetStatus.Partial, + visibility: AssetVisibility.Hidden, + originalFileName: 'test.jpg', + }), + 1024, + undefined, + ); + + expect(mocks.storage.mkdir).toHaveBeenCalledWith(expect.stringContaining(authStub.user1.user.id)); + }); + + it('should determine asset type from filename extension', async () => { + const videoDto = { ...mockDto, assetData: { ...mockDto.assetData, filename: 'video.mp4' } }; + mocks.crypto.randomUUID.mockReturnValue(factory.uuid()); + + await sut.onStart(authStub.user1, videoDto); + + expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith( + expect.objectContaining({ + type: AssetType.Video, + }), + expect.anything(), + undefined, + ); + }); + + it('should throw BadRequestException for unsupported file types', async () => { + const unsupportedDto = { ...mockDto, assetData: { ...mockDto.assetData, filename: 'document.xyz' } }; + + await expect(sut.onStart(authStub.user1, unsupportedDto)).rejects.toThrow(BadRequestException); + await expect(sut.onStart(authStub.user1, unsupportedDto)).rejects.toThrow('unsupported file type'); + }); + + it('should validate quota before creating asset', async () => { + const authWithQuota = { + ...authStub.user1, + user: { + ...authStub.user1.user, + quotaSizeInBytes: 2000, + quotaUsageInBytes: 1500, + }, + }; + + await expect(sut.onStart(authWithQuota, mockDto)).rejects.toThrow(BadRequestException); + await expect(sut.onStart(authWithQuota, mockDto)).rejects.toThrow('Quota has been exceeded'); + }); + + it('should allow upload when quota is null (unlimited)', async () => { + const authWithUnlimitedQuota = { + ...authStub.user1, + user: { + ...authStub.user1.user, + quotaSizeInBytes: null, + quotaUsageInBytes: 1000, + }, + }; + + mocks.crypto.randomUUID.mockReturnValue(factory.uuid()); + + await expect(sut.onStart(authWithUnlimitedQuota, mockDto)).resolves.toBeDefined(); + }); + + it('should allow upload when within quota', async () => { + const authWithQuota = { + ...authStub.user1, + user: { + ...authStub.user1.user, + quotaSizeInBytes: 5000, + quotaUsageInBytes: 1000, + }, + }; + + mocks.crypto.randomUUID.mockReturnValue(factory.uuid()); + + const result = await sut.onStart(authWithQuota, mockDto); + + expect(result.isDuplicate).toBe(false); + }); + + it('should handle duplicate detection via checksum constraint', async () => { + const existingAssetId = factory.uuid(); + const checksumError = new Error('duplicate key value violates unique constraint'); + (checksumError as any).constraint_name = ASSET_CHECKSUM_CONSTRAINT; + + mocks.asset.createWithMetadata.mockRejectedValue(checksumError); + mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue({ + id: existingAssetId, + status: AssetStatus.Partial, + createdAt: new Date(), + }); + + const result = await sut.onStart(authStub.user1, mockDto); + + expect(result).toEqual({ + id: existingAssetId, + path: expect.any(String), + status: AssetStatus.Partial, + isDuplicate: true, + }); + + expect(mocks.asset.getUploadAssetIdByChecksum).toHaveBeenCalledWith(authStub.user1.user.id, mockDto.checksum); + }); + + it('should throw InternalServerErrorException if duplicate lookup fails', async () => { + const checksumError = new Error('duplicate key value violates unique constraint'); + (checksumError as any).constraint_name = ASSET_CHECKSUM_CONSTRAINT; + + mocks.asset.createWithMetadata.mockRejectedValue(checksumError); + mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue(undefined); + + await expect(sut.onStart(authStub.user1, mockDto)).rejects.toThrow(InternalServerErrorException); + }); + + it('should throw InternalServerErrorException for non-checksum errors', async () => { + const genericError = new Error('database connection failed'); + mocks.asset.createWithMetadata.mockRejectedValue(genericError); + + await expect(sut.onStart(authStub.user1, mockDto)).rejects.toThrow(InternalServerErrorException); + }); + + it('should include iCloud metadata when provided', async () => { + const dtoWithICloud = { + ...mockDto, + assetData: { + ...mockDto.assetData, + iCloudId: 'icloud-123', + }, + }; + + mocks.crypto.randomUUID.mockReturnValue(factory.uuid()); + + await sut.onStart(authStub.user1, dtoWithICloud); + + expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(expect.anything(), expect.anything(), [ + { key: AssetMetadataKey.MobileApp, value: { iCloudId: 'icloud-123' } }, + ]); + }); + + it('should include duration for video assets', async () => { + const videoDto = { + ...mockDto, + assetData: { + ...mockDto.assetData, + filename: 'video.mp4', + duration: '00:05:30', + }, + }; + + mocks.crypto.randomUUID.mockReturnValue(factory.uuid()); + + await sut.onStart(authStub.user1, videoDto); + + expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith( + expect.objectContaining({ + duration: '00:05:30', + }), + expect.anything(), + undefined, + ); + }); + + it('should set isFavorite when true', async () => { + const favoriteDto = { + ...mockDto, + assetData: { + ...mockDto.assetData, + isFavorite: true, + }, + }; + + mocks.crypto.randomUUID.mockReturnValue(factory.uuid()); + + await sut.onStart(authStub.user1, favoriteDto); + + expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith( + expect.objectContaining({ + isFavorite: true, + }), + expect.anything(), + undefined, + ); + }); + }); + + describe('onComplete', () => { + const assetId = factory.uuid(); + const path = `/upload/${assetId}/file.jpg`; + const fileModifiedAt = new Date('2025-01-01T12:00:00Z'); + + it('should mark asset as complete and queue metadata extraction job', async () => { + await sut.onComplete({ id: assetId, path, fileModifiedAt }); + + expect(mocks.asset.setComplete).toHaveBeenCalledWith(assetId); + expect(mocks.job.queue).toHaveBeenCalledWith({ + name: JobName.AssetExtractMetadata, + data: { id: assetId, source: 'upload' }, + }); + }); + + it('should update file modification time', async () => { + await sut.onComplete({ id: assetId, path, fileModifiedAt }); + + expect(mocks.storage.utimes).toHaveBeenCalledWith(path, expect.any(Date), fileModifiedAt); + }); + + it('should handle utimes failure gracefully', async () => { + mocks.storage.utimes.mockRejectedValue(new Error('Permission denied')); + + await expect(sut.onComplete({ id: assetId, path, fileModifiedAt })).resolves.toBeUndefined(); + + // Should still complete asset and queue job + expect(mocks.asset.setComplete).toHaveBeenCalled(); + expect(mocks.job.queue).toHaveBeenCalled(); + }); + + it('should retry setComplete on transient failures', async () => { + mocks.asset.setComplete + .mockRejectedValueOnce(new Error('Transient error')) + .mockRejectedValueOnce(new Error('Transient error')) + .mockResolvedValue(); + + await sut.onComplete({ id: assetId, path, fileModifiedAt }); + + expect(mocks.asset.setComplete).toHaveBeenCalledTimes(3); + }); + + it('should retry job queueing on transient failures', async () => { + mocks.job.queue.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue(); + + await sut.onComplete({ id: assetId, path, fileModifiedAt }); + + expect(mocks.job.queue).toHaveBeenCalledTimes(2); + }); + }); + + describe('onCancel', () => { + const assetId = factory.uuid(); + const path = `/upload/${assetId}/file.jpg`; + + it('should delete file and remove asset record', async () => { + await sut.onCancel(assetId, path); + + expect(mocks.storage.unlink).toHaveBeenCalledWith(path); + expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId); + }); + + it('should retry unlink on transient failures', async () => { + mocks.storage.unlink.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue(); + + await sut.onCancel(assetId, path); + + expect(mocks.storage.unlink).toHaveBeenCalledTimes(2); + }); + + it('should retry removeAndDecrementQuota on transient failures', async () => { + mocks.asset.removeAndDecrementQuota.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue(); + + await sut.onCancel(assetId, path); + + expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledTimes(2); + }); + }); + + describe('removeStaleUploads', () => { + it('should queue cleanup jobs for stale partial assets', async () => { + const staleAssets = [{ id: factory.uuid() }, { id: factory.uuid() }, { id: factory.uuid() }]; + + mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue( + (async function* () { + for (const asset of staleAssets) { + yield asset; + } + })(), + ); + + await sut.removeStaleUploads(); + + expect(mocks.assetJob.streamForPartialAssetCleanupJob).toHaveBeenCalledWith(expect.any(Date)); + + expect(mocks.job.queueAll).toHaveBeenCalledWith([ + { name: JobName.PartialAssetCleanup, data: staleAssets[0] }, + { name: JobName.PartialAssetCleanup, data: staleAssets[1] }, + { name: JobName.PartialAssetCleanup, data: staleAssets[2] }, + ]); + }); + + it('should batch cleanup jobs', async () => { + const assets = Array.from({ length: 1500 }, () => ({ id: factory.uuid() })); + + mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue( + (async function* () { + for (const asset of assets) { + yield asset; + } + })(), + ); + + await sut.removeStaleUploads(); + + // Should be called twice: once for 1000, once for 500 + expect(mocks.job.queueAll).toHaveBeenCalledTimes(2); + }); + + it('should handle empty stream', async () => { + mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue((async function* () {})()); + + await sut.removeStaleUploads(); + + expect(mocks.job.queueAll).toHaveBeenCalledWith([]); + }); + }); + + describe('removeStaleUpload', () => { + const assetId = factory.uuid(); + const path = `/upload/${assetId}/file.jpg`; + + it('should skip if asset not found', async () => { + mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue(undefined); + + const result = await sut.removeStaleUpload({ id: assetId }); + + expect(result).toBe(JobStatus.Skipped); + expect(mocks.storage.stat).not.toHaveBeenCalled(); + }); + + it('should complete asset if file matches expected state', async () => { + const checksum = Buffer.from('checksum'); + const fileModifiedAt = new Date(); + + mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({ + path, + checksum, + fileModifiedAt, + size: 1024, + }); + + mocks.storage.stat.mockResolvedValue({ size: 1024 } as any); + mocks.crypto.hashFile.mockResolvedValue(checksum); + + const result = await sut.removeStaleUpload({ id: assetId }); + + expect(result).toBe(JobStatus.Success); + expect(mocks.asset.setComplete).toHaveBeenCalledWith(assetId); + expect(mocks.storage.unlink).not.toHaveBeenCalled(); + }); + + it('should cancel asset if file size does not match', async () => { + mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({ + path, + checksum: Buffer.from('checksum'), + fileModifiedAt: new Date(), + size: 1024, + }); + + mocks.storage.stat.mockResolvedValue({ size: 512 } as any); + + const result = await sut.removeStaleUpload({ id: assetId }); + + expect(result).toBe(JobStatus.Success); + expect(mocks.storage.unlink).toHaveBeenCalledWith(path); + expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId); + }); + + it('should cancel asset if checksum does not match', async () => { + mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({ + path, + checksum: Buffer.from('expected-checksum'), + fileModifiedAt: new Date(), + size: 1024, + }); + + mocks.storage.stat.mockResolvedValue({ size: 1024 } as any); + mocks.crypto.hashFile.mockResolvedValue(Buffer.from('actual-checksum')); + + const result = await sut.removeStaleUpload({ id: assetId }); + + expect(result).toBe(JobStatus.Success); + expect(mocks.storage.unlink).toHaveBeenCalledWith(path); + expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId); + }); + + it('should cancel asset if file does not exist', async () => { + mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({ + path, + checksum: Buffer.from('checksum'), + fileModifiedAt: new Date(), + size: 1024, + }); + + const error = new Error('File not found') as NodeJS.ErrnoException; + error.code = 'ENOENT'; + mocks.storage.stat.mockRejectedValue(error); + + const result = await sut.removeStaleUpload({ id: assetId }); + + expect(result).toBe(JobStatus.Success); + expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId); + }); + + it('should cancel asset if stat fails with permission error', async () => { + mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({ + path, + checksum: Buffer.from('checksum'), + fileModifiedAt: new Date(), + size: 1024, + }); + + const error = new Error('Permission denied') as NodeJS.ErrnoException; + error.code = 'EACCES'; + mocks.storage.stat.mockRejectedValue(error); + + const result = await sut.removeStaleUpload({ id: assetId }); + + expect(result).toBe(JobStatus.Success); + expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId); + }); + }); +}); diff --git a/server/src/services/asset-upload.service.ts b/server/src/services/asset-upload.service.ts index 3544628395..9c25b113f2 100644 --- a/server/src/services/asset-upload.service.ts +++ b/server/src/services/asset-upload.service.ts @@ -1,4 +1,4 @@ -import { BadRequestException, Injectable } from '@nestjs/common'; +import { BadRequestException, Injectable, InternalServerErrorException } from '@nestjs/common'; import { Response } from 'express'; import { DateTime } from 'luxon'; import { createHash } from 'node:crypto'; @@ -33,59 +33,13 @@ export class AssetUploadService extends BaseService { this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`); const { isComplete, assetData, uploadLength, contentLength, version } = dto; - const assetId = this.cryptoRepository.randomUUID(); - const folder = StorageCore.getNestedFolder(StorageFolder.Upload, auth.user.id, assetId); - const extension = extname(assetData.filename); - const path = join(folder, `${assetId}${extension}`); - const type = mimeTypes.assetType(path); - - if (type === AssetType.Other) { - throw new BadRequestException(`${assetData.filename} is an unsupported file type`); - } - - this.validateQuota(auth, uploadLength); - - try { - await this.assetRepository.createWithMetadata( - { - id: assetId, - ownerId: auth.user.id, - libraryId: null, - checksum: dto.checksum, - originalPath: path, - deviceAssetId: assetData.deviceAssetId, - deviceId: assetData.deviceId, - fileCreatedAt: assetData.fileCreatedAt, - fileModifiedAt: assetData.fileModifiedAt, - localDateTime: assetData.fileCreatedAt, - type: type, - isFavorite: assetData.isFavorite, - duration: assetData.duration || null, - visibility: AssetVisibility.Hidden, - originalFileName: assetData.filename, - status: AssetStatus.Partial, - }, - uploadLength, - assetData.iCloudId ? [{ key: AssetMetadataKey.MobileApp, value: { iCloudId: assetData.iCloudId } }] : undefined, - ); - } catch (error: any) { - if (!isAssetChecksumConstraint(error)) { - this.logger.error(`Error creating upload asset record: ${error.message}`); - res.status(500).send('Error creating upload asset record'); - return; - } - - const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, dto.checksum); - if (!duplicate) { - res.status(500).send('Error locating duplicate for checksum constraint'); - return; - } - - if (duplicate.status !== AssetStatus.Partial) { + const asset = await this.onStart(auth, dto); + if (asset.isDuplicate) { + if (asset.status !== AssetStatus.Partial) { return this.sendAlreadyCompletedProblem(res); } - const location = `/api/upload/${duplicate.id}`; + const location = `/api/upload/${asset.id}`; if (version <= MAX_RUFH_INTEROP_VERSION) { this.sendInterimResponse(res, location, version); } @@ -98,14 +52,13 @@ export class AssetUploadService extends BaseService { return this.sendInconsistentLengthProblem(res); } - const location = `/api/upload/${assetId}`; + const location = `/api/upload/${asset.id}`; if (version <= MAX_RUFH_INTEROP_VERSION) { this.sendInterimResponse(res, location, version); } - await this.storageRepository.mkdir(folder); let checksumBuffer: Buffer | undefined; - const metadata = { id: assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt }; + const metadata = { id: asset.id, path: asset.path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt }; const writeStream = this.pipe(req, res, metadata); if (isComplete) { @@ -119,15 +72,15 @@ export class AssetUploadService extends BaseService { if (!isComplete) { return res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); } - this.logger.log(`Finished upload to ${path}`); + this.logger.log(`Finished upload to ${asset.path}`); if (dto.checksum.compare(checksumBuffer!) !== 0) { - return this.sendChecksumMismatchResponse(res, assetId, path); + return this.sendChecksumMismatchResponse(res, asset.id, asset.path); } this.onComplete(metadata) .then(() => res.status(200).send()) .catch((error) => { - this.logger.error(`Failed to complete upload for ${assetId}: ${error.message}`); + this.logger.error(`Failed to complete upload for ${asset.id}: ${error.message}`); res.status(500).send(); }); }); @@ -230,14 +183,14 @@ export class AssetUploadService extends BaseService { }); } - @OnJob({ name: JobName.PartialAssetDeleteQueueAll, queue: QueueName.BackgroundTask }) + @OnJob({ name: JobName.PartialAssetCleanupQueueAll, queue: QueueName.BackgroundTask }) async removeStaleUploads(): Promise { // TODO: make this configurable const createdBefore = DateTime.now().minus({ days: 7 }).toJSDate(); let jobs: JobItem[] = []; const assets = this.assetJobRepository.streamForPartialAssetCleanupJob(createdBefore); for await (const asset of assets) { - jobs.push({ name: JobName.AssetFileMigration, data: asset }); + jobs.push({ name: JobName.PartialAssetCleanup, data: asset }); if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) { await this.jobRepository.queueAll(jobs); jobs = []; @@ -246,8 +199,8 @@ export class AssetUploadService extends BaseService { await this.jobRepository.queueAll(jobs); } - @OnJob({ name: JobName.PartialAssetDelete, queue: QueueName.BackgroundTask }) - removeStaleUpload({ id }: JobOf): Promise { + @OnJob({ name: JobName.PartialAssetCleanup, queue: QueueName.BackgroundTask }) + removeStaleUpload({ id }: JobOf): Promise { return this.databaseRepository.withUuidLock(id, async () => { const asset = await this.assetJobRepository.getForPartialAssetCleanupJob(id); if (!asset) { @@ -268,6 +221,81 @@ export class AssetUploadService extends BaseService { }); } + async onStart( + auth: AuthDto, + { assetData, checksum, uploadLength }: StartUploadDto, + ): Promise<{ id: string; path: string; status: AssetStatus; isDuplicate: boolean }> { + const assetId = this.cryptoRepository.randomUUID(); + const folder = StorageCore.getNestedFolder(StorageFolder.Upload, auth.user.id, assetId); + const extension = extname(assetData.filename); + const path = join(folder, `${assetId}${extension}`); + const type = mimeTypes.assetType(path); + + if (type === AssetType.Other) { + throw new BadRequestException(`${assetData.filename} is an unsupported file type`); + } + + this.validateQuota(auth, uploadLength); + + try { + await this.assetRepository.createWithMetadata( + { + id: assetId, + ownerId: auth.user.id, + libraryId: null, + checksum, + originalPath: path, + deviceAssetId: assetData.deviceAssetId, + deviceId: assetData.deviceId, + fileCreatedAt: assetData.fileCreatedAt, + fileModifiedAt: assetData.fileModifiedAt, + localDateTime: assetData.fileCreatedAt, + type: type, + isFavorite: assetData.isFavorite, + duration: assetData.duration || null, + visibility: AssetVisibility.Hidden, + originalFileName: assetData.filename, + status: AssetStatus.Partial, + }, + uploadLength, + assetData.iCloudId ? [{ key: AssetMetadataKey.MobileApp, value: { iCloudId: assetData.iCloudId } }] : undefined, + ); + } catch (error: any) { + if (!isAssetChecksumConstraint(error)) { + this.logger.error(`Error creating upload asset record: ${error.message}`); + throw new InternalServerErrorException('Error creating asset'); + } + + const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, checksum); + if (!duplicate) { + throw new InternalServerErrorException('Error locating duplicate for checksum constraint'); + } + + return { id: duplicate.id, path, status: duplicate.status, isDuplicate: true }; + } + + await this.storageRepository.mkdir(folder); + return { id: assetId, path, status: AssetStatus.Partial, isDuplicate: false }; + } + + async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) { + this.logger.debug('Completing upload for asset', id); + const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const; + await withRetry(() => this.assetRepository.setComplete(id)); + try { + await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt)); + } catch (error: any) { + this.logger.error(`Failed to update times for ${path}: ${error.message}`); + } + await withRetry(() => this.jobRepository.queue(jobData)); + } + + async onCancel(assetId: string, path: string): Promise { + this.logger.debug('Cancelling upload for asset', assetId); + await withRetry(() => this.storageRepository.unlink(path)); + await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId)); + } + private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) { const writeStream = this.storageRepository.createOrAppendWriteStream(path); writeStream.on('error', (error) => { @@ -313,24 +341,6 @@ export class AssetUploadService extends BaseService { return writeStream; } - private async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) { - this.logger.debug('Completing upload for asset', id); - const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const; - await withRetry(() => this.assetRepository.setComplete(id)); - try { - await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt)); - } catch (error: any) { - this.logger.error(`Failed to update times for ${path}: ${error.message}`); - } - await withRetry(() => this.jobRepository.queue(jobData)); - } - - private async onCancel(assetId: string, path: string): Promise { - this.logger.debug('Cancelling upload for asset', assetId); - await withRetry(() => this.storageRepository.unlink(path)); - await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId)); - } - private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void { if (socket && !socket.destroyed) { // Express doesn't understand interim responses, so write directly to socket diff --git a/server/src/types.ts b/server/src/types.ts index 121581c8be..ce321eab8e 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -352,8 +352,8 @@ export type JobItem = | { name: JobName.PersonCleanup; data?: IBaseJob } | { name: JobName.AssetDelete; data: IAssetDeleteJob } | { name: JobName.AssetDeleteCheck; data?: IBaseJob } - | { name: JobName.PartialAssetDelete; data: IEntityJob } - | { name: JobName.PartialAssetDeleteQueueAll; data: IBaseJob } + | { name: JobName.PartialAssetCleanup; data: IEntityJob } + | { name: JobName.PartialAssetCleanupQueueAll; data: IBaseJob } // Library Management | { name: JobName.LibrarySyncFiles; data: ILibraryFileJob }