add service tests

This commit is contained in:
mertalev 2025-10-06 19:47:27 -04:00
parent d4f3d9d6a5
commit 484b73eb60
No known key found for this signature in database
GPG key ID: DF6ABC77AAD98C95
5 changed files with 574 additions and 85 deletions

View file

@ -531,8 +531,8 @@ export enum JobName {
AssetFileMigration = 'AssetFileMigration', AssetFileMigration = 'AssetFileMigration',
AssetGenerateThumbnailsQueueAll = 'AssetGenerateThumbnailsQueueAll', AssetGenerateThumbnailsQueueAll = 'AssetGenerateThumbnailsQueueAll',
AssetGenerateThumbnails = 'AssetGenerateThumbnails', AssetGenerateThumbnails = 'AssetGenerateThumbnails',
PartialAssetDelete = 'PartialAssetCleanup', PartialAssetCleanup = 'PartialAssetCleanup',
PartialAssetDeleteQueueAll = 'PartialAssetCleanupQueueAll', PartialAssetCleanupQueueAll = 'PartialAssetCleanupQueueAll',
AuditLogCleanup = 'AuditLogCleanup', AuditLogCleanup = 'AuditLogCleanup',
AuditTableCleanup = 'AuditTableCleanup', AuditTableCleanup = 'AuditTableCleanup',

View file

@ -287,8 +287,8 @@ export class AssetRepository {
.executeTakeFirst(); .executeTakeFirst();
} }
setComplete(assetId: string) { async setComplete(assetId: string) {
return this.db await this.db
.updateTable('asset') .updateTable('asset')
.set({ status: AssetStatus.Active, visibility: AssetVisibility.Timeline }) .set({ status: AssetStatus.Active, visibility: AssetVisibility.Timeline })
.where('id', '=', assetId) .where('id', '=', assetId)

View file

@ -0,0 +1,479 @@
import { BadRequestException, InternalServerErrorException } from '@nestjs/common';
import { StructuredBoolean } from 'src/dtos/upload.dto';
import { AssetMetadataKey, AssetStatus, AssetType, AssetVisibility, JobName, JobStatus } from 'src/enum';
import { AssetUploadService } from 'src/services/asset-upload.service';
import { ASSET_CHECKSUM_CONSTRAINT } from 'src/utils/database';
import { authStub } from 'test/fixtures/auth.stub';
import { factory } from 'test/small.factory';
import { newTestService, ServiceMocks } from 'test/utils';
describe(AssetUploadService.name, () => {
let sut: AssetUploadService;
let mocks: ServiceMocks;
beforeEach(() => {
({ sut, mocks } = newTestService(AssetUploadService));
});
describe('onStart', () => {
const mockDto = {
assetData: {
filename: 'test.jpg',
deviceAssetId: 'device-asset-1',
deviceId: 'device-1',
fileCreatedAt: new Date('2025-01-01T00:00:00Z'),
fileModifiedAt: new Date('2025-01-01T12:00:00Z'),
isFavorite: false,
iCloudId: ''
},
checksum: Buffer.from('checksum'),
uploadLength: 1024,
uploadComplete: StructuredBoolean.True,
uploadIncomplete: StructuredBoolean.False,
contentLength: 1024,
isComplete: true,
version: 8,
};
it('should create a new asset and return upload metadata', async () => {
const assetId = factory.uuid();
mocks.crypto.randomUUID.mockReturnValue(assetId);
const result = await sut.onStart(authStub.user1, mockDto);
expect(result).toEqual({
id: assetId,
path: expect.stringContaining(assetId),
status: AssetStatus.Partial,
isDuplicate: false,
});
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(
expect.objectContaining({
id: assetId,
ownerId: authStub.user1.user.id,
checksum: mockDto.checksum,
deviceAssetId: mockDto.assetData.deviceAssetId,
deviceId: mockDto.assetData.deviceId,
fileCreatedAt: mockDto.assetData.fileCreatedAt,
fileModifiedAt: mockDto.assetData.fileModifiedAt,
type: AssetType.Image,
isFavorite: false,
status: AssetStatus.Partial,
visibility: AssetVisibility.Hidden,
originalFileName: 'test.jpg',
}),
1024,
undefined,
);
expect(mocks.storage.mkdir).toHaveBeenCalledWith(expect.stringContaining(authStub.user1.user.id));
});
it('should determine asset type from filename extension', async () => {
const videoDto = { ...mockDto, assetData: { ...mockDto.assetData, filename: 'video.mp4' } };
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await sut.onStart(authStub.user1, videoDto);
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(
expect.objectContaining({
type: AssetType.Video,
}),
expect.anything(),
undefined,
);
});
it('should throw BadRequestException for unsupported file types', async () => {
const unsupportedDto = { ...mockDto, assetData: { ...mockDto.assetData, filename: 'document.xyz' } };
await expect(sut.onStart(authStub.user1, unsupportedDto)).rejects.toThrow(BadRequestException);
await expect(sut.onStart(authStub.user1, unsupportedDto)).rejects.toThrow('unsupported file type');
});
it('should validate quota before creating asset', async () => {
const authWithQuota = {
...authStub.user1,
user: {
...authStub.user1.user,
quotaSizeInBytes: 2000,
quotaUsageInBytes: 1500,
},
};
await expect(sut.onStart(authWithQuota, mockDto)).rejects.toThrow(BadRequestException);
await expect(sut.onStart(authWithQuota, mockDto)).rejects.toThrow('Quota has been exceeded');
});
it('should allow upload when quota is null (unlimited)', async () => {
const authWithUnlimitedQuota = {
...authStub.user1,
user: {
...authStub.user1.user,
quotaSizeInBytes: null,
quotaUsageInBytes: 1000,
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await expect(sut.onStart(authWithUnlimitedQuota, mockDto)).resolves.toBeDefined();
});
it('should allow upload when within quota', async () => {
const authWithQuota = {
...authStub.user1,
user: {
...authStub.user1.user,
quotaSizeInBytes: 5000,
quotaUsageInBytes: 1000,
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
const result = await sut.onStart(authWithQuota, mockDto);
expect(result.isDuplicate).toBe(false);
});
it('should handle duplicate detection via checksum constraint', async () => {
const existingAssetId = factory.uuid();
const checksumError = new Error('duplicate key value violates unique constraint');
(checksumError as any).constraint_name = ASSET_CHECKSUM_CONSTRAINT;
mocks.asset.createWithMetadata.mockRejectedValue(checksumError);
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue({
id: existingAssetId,
status: AssetStatus.Partial,
createdAt: new Date(),
});
const result = await sut.onStart(authStub.user1, mockDto);
expect(result).toEqual({
id: existingAssetId,
path: expect.any(String),
status: AssetStatus.Partial,
isDuplicate: true,
});
expect(mocks.asset.getUploadAssetIdByChecksum).toHaveBeenCalledWith(authStub.user1.user.id, mockDto.checksum);
});
it('should throw InternalServerErrorException if duplicate lookup fails', async () => {
const checksumError = new Error('duplicate key value violates unique constraint');
(checksumError as any).constraint_name = ASSET_CHECKSUM_CONSTRAINT;
mocks.asset.createWithMetadata.mockRejectedValue(checksumError);
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue(undefined);
await expect(sut.onStart(authStub.user1, mockDto)).rejects.toThrow(InternalServerErrorException);
});
it('should throw InternalServerErrorException for non-checksum errors', async () => {
const genericError = new Error('database connection failed');
mocks.asset.createWithMetadata.mockRejectedValue(genericError);
await expect(sut.onStart(authStub.user1, mockDto)).rejects.toThrow(InternalServerErrorException);
});
it('should include iCloud metadata when provided', async () => {
const dtoWithICloud = {
...mockDto,
assetData: {
...mockDto.assetData,
iCloudId: 'icloud-123',
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await sut.onStart(authStub.user1, dtoWithICloud);
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(expect.anything(), expect.anything(), [
{ key: AssetMetadataKey.MobileApp, value: { iCloudId: 'icloud-123' } },
]);
});
it('should include duration for video assets', async () => {
const videoDto = {
...mockDto,
assetData: {
...mockDto.assetData,
filename: 'video.mp4',
duration: '00:05:30',
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await sut.onStart(authStub.user1, videoDto);
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(
expect.objectContaining({
duration: '00:05:30',
}),
expect.anything(),
undefined,
);
});
it('should set isFavorite when true', async () => {
const favoriteDto = {
...mockDto,
assetData: {
...mockDto.assetData,
isFavorite: true,
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await sut.onStart(authStub.user1, favoriteDto);
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(
expect.objectContaining({
isFavorite: true,
}),
expect.anything(),
undefined,
);
});
});
describe('onComplete', () => {
const assetId = factory.uuid();
const path = `/upload/${assetId}/file.jpg`;
const fileModifiedAt = new Date('2025-01-01T12:00:00Z');
it('should mark asset as complete and queue metadata extraction job', async () => {
await sut.onComplete({ id: assetId, path, fileModifiedAt });
expect(mocks.asset.setComplete).toHaveBeenCalledWith(assetId);
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.AssetExtractMetadata,
data: { id: assetId, source: 'upload' },
});
});
it('should update file modification time', async () => {
await sut.onComplete({ id: assetId, path, fileModifiedAt });
expect(mocks.storage.utimes).toHaveBeenCalledWith(path, expect.any(Date), fileModifiedAt);
});
it('should handle utimes failure gracefully', async () => {
mocks.storage.utimes.mockRejectedValue(new Error('Permission denied'));
await expect(sut.onComplete({ id: assetId, path, fileModifiedAt })).resolves.toBeUndefined();
// Should still complete asset and queue job
expect(mocks.asset.setComplete).toHaveBeenCalled();
expect(mocks.job.queue).toHaveBeenCalled();
});
it('should retry setComplete on transient failures', async () => {
mocks.asset.setComplete
.mockRejectedValueOnce(new Error('Transient error'))
.mockRejectedValueOnce(new Error('Transient error'))
.mockResolvedValue();
await sut.onComplete({ id: assetId, path, fileModifiedAt });
expect(mocks.asset.setComplete).toHaveBeenCalledTimes(3);
});
it('should retry job queueing on transient failures', async () => {
mocks.job.queue.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue();
await sut.onComplete({ id: assetId, path, fileModifiedAt });
expect(mocks.job.queue).toHaveBeenCalledTimes(2);
});
});
describe('onCancel', () => {
const assetId = factory.uuid();
const path = `/upload/${assetId}/file.jpg`;
it('should delete file and remove asset record', async () => {
await sut.onCancel(assetId, path);
expect(mocks.storage.unlink).toHaveBeenCalledWith(path);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
it('should retry unlink on transient failures', async () => {
mocks.storage.unlink.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue();
await sut.onCancel(assetId, path);
expect(mocks.storage.unlink).toHaveBeenCalledTimes(2);
});
it('should retry removeAndDecrementQuota on transient failures', async () => {
mocks.asset.removeAndDecrementQuota.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue();
await sut.onCancel(assetId, path);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledTimes(2);
});
});
describe('removeStaleUploads', () => {
it('should queue cleanup jobs for stale partial assets', async () => {
const staleAssets = [{ id: factory.uuid() }, { id: factory.uuid() }, { id: factory.uuid() }];
mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue(
(async function* () {
for (const asset of staleAssets) {
yield asset;
}
})(),
);
await sut.removeStaleUploads();
expect(mocks.assetJob.streamForPartialAssetCleanupJob).toHaveBeenCalledWith(expect.any(Date));
expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ name: JobName.PartialAssetCleanup, data: staleAssets[0] },
{ name: JobName.PartialAssetCleanup, data: staleAssets[1] },
{ name: JobName.PartialAssetCleanup, data: staleAssets[2] },
]);
});
it('should batch cleanup jobs', async () => {
const assets = Array.from({ length: 1500 }, () => ({ id: factory.uuid() }));
mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue(
(async function* () {
for (const asset of assets) {
yield asset;
}
})(),
);
await sut.removeStaleUploads();
// Should be called twice: once for 1000, once for 500
expect(mocks.job.queueAll).toHaveBeenCalledTimes(2);
});
it('should handle empty stream', async () => {
mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue((async function* () {})());
await sut.removeStaleUploads();
expect(mocks.job.queueAll).toHaveBeenCalledWith([]);
});
});
describe('removeStaleUpload', () => {
const assetId = factory.uuid();
const path = `/upload/${assetId}/file.jpg`;
it('should skip if asset not found', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue(undefined);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Skipped);
expect(mocks.storage.stat).not.toHaveBeenCalled();
});
it('should complete asset if file matches expected state', async () => {
const checksum = Buffer.from('checksum');
const fileModifiedAt = new Date();
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum,
fileModifiedAt,
size: 1024,
});
mocks.storage.stat.mockResolvedValue({ size: 1024 } as any);
mocks.crypto.hashFile.mockResolvedValue(checksum);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.asset.setComplete).toHaveBeenCalledWith(assetId);
expect(mocks.storage.unlink).not.toHaveBeenCalled();
});
it('should cancel asset if file size does not match', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum: Buffer.from('checksum'),
fileModifiedAt: new Date(),
size: 1024,
});
mocks.storage.stat.mockResolvedValue({ size: 512 } as any);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.storage.unlink).toHaveBeenCalledWith(path);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
it('should cancel asset if checksum does not match', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum: Buffer.from('expected-checksum'),
fileModifiedAt: new Date(),
size: 1024,
});
mocks.storage.stat.mockResolvedValue({ size: 1024 } as any);
mocks.crypto.hashFile.mockResolvedValue(Buffer.from('actual-checksum'));
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.storage.unlink).toHaveBeenCalledWith(path);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
it('should cancel asset if file does not exist', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum: Buffer.from('checksum'),
fileModifiedAt: new Date(),
size: 1024,
});
const error = new Error('File not found') as NodeJS.ErrnoException;
error.code = 'ENOENT';
mocks.storage.stat.mockRejectedValue(error);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
it('should cancel asset if stat fails with permission error', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum: Buffer.from('checksum'),
fileModifiedAt: new Date(),
size: 1024,
});
const error = new Error('Permission denied') as NodeJS.ErrnoException;
error.code = 'EACCES';
mocks.storage.stat.mockRejectedValue(error);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
});
});

View file

@ -1,4 +1,4 @@
import { BadRequestException, Injectable } from '@nestjs/common'; import { BadRequestException, Injectable, InternalServerErrorException } from '@nestjs/common';
import { Response } from 'express'; import { Response } from 'express';
import { DateTime } from 'luxon'; import { DateTime } from 'luxon';
import { createHash } from 'node:crypto'; import { createHash } from 'node:crypto';
@ -33,59 +33,13 @@ export class AssetUploadService extends BaseService {
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`); this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
const { isComplete, assetData, uploadLength, contentLength, version } = dto; const { isComplete, assetData, uploadLength, contentLength, version } = dto;
const assetId = this.cryptoRepository.randomUUID(); const asset = await this.onStart(auth, dto);
const folder = StorageCore.getNestedFolder(StorageFolder.Upload, auth.user.id, assetId); if (asset.isDuplicate) {
const extension = extname(assetData.filename); if (asset.status !== AssetStatus.Partial) {
const path = join(folder, `${assetId}${extension}`);
const type = mimeTypes.assetType(path);
if (type === AssetType.Other) {
throw new BadRequestException(`${assetData.filename} is an unsupported file type`);
}
this.validateQuota(auth, uploadLength);
try {
await this.assetRepository.createWithMetadata(
{
id: assetId,
ownerId: auth.user.id,
libraryId: null,
checksum: dto.checksum,
originalPath: path,
deviceAssetId: assetData.deviceAssetId,
deviceId: assetData.deviceId,
fileCreatedAt: assetData.fileCreatedAt,
fileModifiedAt: assetData.fileModifiedAt,
localDateTime: assetData.fileCreatedAt,
type: type,
isFavorite: assetData.isFavorite,
duration: assetData.duration || null,
visibility: AssetVisibility.Hidden,
originalFileName: assetData.filename,
status: AssetStatus.Partial,
},
uploadLength,
assetData.iCloudId ? [{ key: AssetMetadataKey.MobileApp, value: { iCloudId: assetData.iCloudId } }] : undefined,
);
} catch (error: any) {
if (!isAssetChecksumConstraint(error)) {
this.logger.error(`Error creating upload asset record: ${error.message}`);
res.status(500).send('Error creating upload asset record');
return;
}
const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, dto.checksum);
if (!duplicate) {
res.status(500).send('Error locating duplicate for checksum constraint');
return;
}
if (duplicate.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(res); return this.sendAlreadyCompletedProblem(res);
} }
const location = `/api/upload/${duplicate.id}`; const location = `/api/upload/${asset.id}`;
if (version <= MAX_RUFH_INTEROP_VERSION) { if (version <= MAX_RUFH_INTEROP_VERSION) {
this.sendInterimResponse(res, location, version); this.sendInterimResponse(res, location, version);
} }
@ -98,14 +52,13 @@ export class AssetUploadService extends BaseService {
return this.sendInconsistentLengthProblem(res); return this.sendInconsistentLengthProblem(res);
} }
const location = `/api/upload/${assetId}`; const location = `/api/upload/${asset.id}`;
if (version <= MAX_RUFH_INTEROP_VERSION) { if (version <= MAX_RUFH_INTEROP_VERSION) {
this.sendInterimResponse(res, location, version); this.sendInterimResponse(res, location, version);
} }
await this.storageRepository.mkdir(folder);
let checksumBuffer: Buffer | undefined; let checksumBuffer: Buffer | undefined;
const metadata = { id: assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt }; const metadata = { id: asset.id, path: asset.path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt };
const writeStream = this.pipe(req, res, metadata); const writeStream = this.pipe(req, res, metadata);
if (isComplete) { if (isComplete) {
@ -119,15 +72,15 @@ export class AssetUploadService extends BaseService {
if (!isComplete) { if (!isComplete) {
return res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); return res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
} }
this.logger.log(`Finished upload to ${path}`); this.logger.log(`Finished upload to ${asset.path}`);
if (dto.checksum.compare(checksumBuffer!) !== 0) { if (dto.checksum.compare(checksumBuffer!) !== 0) {
return this.sendChecksumMismatchResponse(res, assetId, path); return this.sendChecksumMismatchResponse(res, asset.id, asset.path);
} }
this.onComplete(metadata) this.onComplete(metadata)
.then(() => res.status(200).send()) .then(() => res.status(200).send())
.catch((error) => { .catch((error) => {
this.logger.error(`Failed to complete upload for ${assetId}: ${error.message}`); this.logger.error(`Failed to complete upload for ${asset.id}: ${error.message}`);
res.status(500).send(); res.status(500).send();
}); });
}); });
@ -230,14 +183,14 @@ export class AssetUploadService extends BaseService {
}); });
} }
@OnJob({ name: JobName.PartialAssetDeleteQueueAll, queue: QueueName.BackgroundTask }) @OnJob({ name: JobName.PartialAssetCleanupQueueAll, queue: QueueName.BackgroundTask })
async removeStaleUploads(): Promise<void> { async removeStaleUploads(): Promise<void> {
// TODO: make this configurable // TODO: make this configurable
const createdBefore = DateTime.now().minus({ days: 7 }).toJSDate(); const createdBefore = DateTime.now().minus({ days: 7 }).toJSDate();
let jobs: JobItem[] = []; let jobs: JobItem[] = [];
const assets = this.assetJobRepository.streamForPartialAssetCleanupJob(createdBefore); const assets = this.assetJobRepository.streamForPartialAssetCleanupJob(createdBefore);
for await (const asset of assets) { for await (const asset of assets) {
jobs.push({ name: JobName.AssetFileMigration, data: asset }); jobs.push({ name: JobName.PartialAssetCleanup, data: asset });
if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) { if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(jobs); await this.jobRepository.queueAll(jobs);
jobs = []; jobs = [];
@ -246,8 +199,8 @@ export class AssetUploadService extends BaseService {
await this.jobRepository.queueAll(jobs); await this.jobRepository.queueAll(jobs);
} }
@OnJob({ name: JobName.PartialAssetDelete, queue: QueueName.BackgroundTask }) @OnJob({ name: JobName.PartialAssetCleanup, queue: QueueName.BackgroundTask })
removeStaleUpload({ id }: JobOf<JobName.PartialAssetDelete>): Promise<JobStatus> { removeStaleUpload({ id }: JobOf<JobName.PartialAssetCleanup>): Promise<JobStatus> {
return this.databaseRepository.withUuidLock(id, async () => { return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetJobRepository.getForPartialAssetCleanupJob(id); const asset = await this.assetJobRepository.getForPartialAssetCleanupJob(id);
if (!asset) { if (!asset) {
@ -268,6 +221,81 @@ export class AssetUploadService extends BaseService {
}); });
} }
async onStart(
auth: AuthDto,
{ assetData, checksum, uploadLength }: StartUploadDto,
): Promise<{ id: string; path: string; status: AssetStatus; isDuplicate: boolean }> {
const assetId = this.cryptoRepository.randomUUID();
const folder = StorageCore.getNestedFolder(StorageFolder.Upload, auth.user.id, assetId);
const extension = extname(assetData.filename);
const path = join(folder, `${assetId}${extension}`);
const type = mimeTypes.assetType(path);
if (type === AssetType.Other) {
throw new BadRequestException(`${assetData.filename} is an unsupported file type`);
}
this.validateQuota(auth, uploadLength);
try {
await this.assetRepository.createWithMetadata(
{
id: assetId,
ownerId: auth.user.id,
libraryId: null,
checksum,
originalPath: path,
deviceAssetId: assetData.deviceAssetId,
deviceId: assetData.deviceId,
fileCreatedAt: assetData.fileCreatedAt,
fileModifiedAt: assetData.fileModifiedAt,
localDateTime: assetData.fileCreatedAt,
type: type,
isFavorite: assetData.isFavorite,
duration: assetData.duration || null,
visibility: AssetVisibility.Hidden,
originalFileName: assetData.filename,
status: AssetStatus.Partial,
},
uploadLength,
assetData.iCloudId ? [{ key: AssetMetadataKey.MobileApp, value: { iCloudId: assetData.iCloudId } }] : undefined,
);
} catch (error: any) {
if (!isAssetChecksumConstraint(error)) {
this.logger.error(`Error creating upload asset record: ${error.message}`);
throw new InternalServerErrorException('Error creating asset');
}
const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, checksum);
if (!duplicate) {
throw new InternalServerErrorException('Error locating duplicate for checksum constraint');
}
return { id: duplicate.id, path, status: duplicate.status, isDuplicate: true };
}
await this.storageRepository.mkdir(folder);
return { id: assetId, path, status: AssetStatus.Partial, isDuplicate: false };
}
async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) {
this.logger.debug('Completing upload for asset', id);
const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const;
await withRetry(() => this.assetRepository.setComplete(id));
try {
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
} catch (error: any) {
this.logger.error(`Failed to update times for ${path}: ${error.message}`);
}
await withRetry(() => this.jobRepository.queue(jobData));
}
async onCancel(assetId: string, path: string): Promise<void> {
this.logger.debug('Cancelling upload for asset', assetId);
await withRetry(() => this.storageRepository.unlink(path));
await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId));
}
private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) { private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) {
const writeStream = this.storageRepository.createOrAppendWriteStream(path); const writeStream = this.storageRepository.createOrAppendWriteStream(path);
writeStream.on('error', (error) => { writeStream.on('error', (error) => {
@ -313,24 +341,6 @@ export class AssetUploadService extends BaseService {
return writeStream; return writeStream;
} }
private async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) {
this.logger.debug('Completing upload for asset', id);
const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const;
await withRetry(() => this.assetRepository.setComplete(id));
try {
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
} catch (error: any) {
this.logger.error(`Failed to update times for ${path}: ${error.message}`);
}
await withRetry(() => this.jobRepository.queue(jobData));
}
private async onCancel(assetId: string, path: string): Promise<void> {
this.logger.debug('Cancelling upload for asset', assetId);
await withRetry(() => this.storageRepository.unlink(path));
await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId));
}
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void { private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
if (socket && !socket.destroyed) { if (socket && !socket.destroyed) {
// Express doesn't understand interim responses, so write directly to socket // Express doesn't understand interim responses, so write directly to socket

View file

@ -352,8 +352,8 @@ export type JobItem =
| { name: JobName.PersonCleanup; data?: IBaseJob } | { name: JobName.PersonCleanup; data?: IBaseJob }
| { name: JobName.AssetDelete; data: IAssetDeleteJob } | { name: JobName.AssetDelete; data: IAssetDeleteJob }
| { name: JobName.AssetDeleteCheck; data?: IBaseJob } | { name: JobName.AssetDeleteCheck; data?: IBaseJob }
| { name: JobName.PartialAssetDelete; data: IEntityJob } | { name: JobName.PartialAssetCleanup; data: IEntityJob }
| { name: JobName.PartialAssetDeleteQueueAll; data: IBaseJob } | { name: JobName.PartialAssetCleanupQueueAll; data: IBaseJob }
// Library Management // Library Management
| { name: JobName.LibrarySyncFiles; data: ILibraryFileJob } | { name: JobName.LibrarySyncFiles; data: ILibraryFileJob }