This commit is contained in:
mertalev 2025-10-06 23:32:20 -04:00
parent 6dbcf8b876
commit 12b1a319e9
No known key found for this signature in database
GPG key ID: DF6ABC77AAD98C95
15 changed files with 317 additions and 468 deletions

View file

@ -36,7 +36,7 @@ export class AssetUploadService extends BaseService {
const asset = await this.onStart(auth, dto);
if (asset.isDuplicate) {
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(res);
return this.sendAlreadyCompleted(res);
}
const location = `/api/upload/${asset.id}`;
@ -49,7 +49,7 @@ export class AssetUploadService extends BaseService {
}
if (isComplete && uploadLength !== contentLength) {
return this.sendInconsistentLengthProblem(res);
return this.sendInconsistentLength(res);
}
const location = `/api/upload/${asset.id}`;
@ -66,25 +66,19 @@ export class AssetUploadService extends BaseService {
req.on('data', (data: Buffer) => hash.update(data));
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
}
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) {
res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
return;
}
this.logger.log(`Finished upload to ${asset.path}`);
if (dto.checksum.compare(checksumBuffer!) !== 0) {
return await this.sendChecksumMismatch(res, asset.id, asset.path);
}
writeStream.on('finish', () => {
this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) {
return res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
}
this.logger.log(`Finished upload to ${asset.path}`);
if (dto.checksum.compare(checksumBuffer!) !== 0) {
return this.sendChecksumMismatchResponse(res, asset.id, asset.path);
}
this.onComplete(metadata)
.then(() => res.status(200).send({ id: asset.id }))
.catch((error) => {
this.logger.error(`Failed to complete upload for ${asset.id}: ${error.message}`);
res.status(500).send();
});
});
await new Promise((resolve) => writeStream.on('close', resolve));
await this.onComplete(metadata);
res.status(200).send({ id: asset.id });
}
resumeUpload(auth: AuthDto, req: Readable, res: Response, id: string, dto: ResumeUploadDto): Promise<void> {
@ -100,16 +94,16 @@ export class AssetUploadService extends BaseService {
const { fileModifiedAt, path, status, checksum: providedChecksum, size } = completionData;
if (status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(res);
return this.sendAlreadyCompleted(res);
}
if (uploadLength && size && size !== uploadLength) {
return this.sendInconsistentLengthProblem(res);
return this.sendInconsistentLength(res);
}
const expectedOffset = await this.getCurrentOffset(path);
if (expectedOffset !== uploadOffset) {
return this.sendOffsetMismatchProblem(res, expectedOffset, uploadOffset);
return this.sendOffsetMismatch(res, expectedOffset, uploadOffset);
}
const newLength = uploadOffset + contentLength;
@ -123,28 +117,29 @@ export class AssetUploadService extends BaseService {
return;
}
const metadata = { id, path, size: contentLength, fileModifiedAt: fileModifiedAt };
const metadata = { id, path, size: contentLength, fileModifiedAt };
const writeStream = this.pipe(req, res, metadata);
writeStream.on('finish', async () => {
this.setCompleteHeader(res, version, isComplete);
const currentOffset = await this.getCurrentOffset(path);
if (!isComplete) {
return res.status(204).setHeader('Upload-Offset', currentOffset.toString()).send();
}
this.logger.log(`Finished upload to ${path}`);
const checksum = await this.cryptoRepository.hashFile(path);
if (providedChecksum.compare(checksum) !== 0) {
return this.sendChecksumMismatchResponse(res, id, path);
}
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
this.setCompleteHeader(res, version, isComplete);
if (!isComplete) {
try {
await this.onComplete(metadata);
} finally {
res.status(200).send({ id });
const offset = await this.getCurrentOffset(path);
res.status(204).setHeader('Upload-Offset', offset.toString()).send();
} catch {
this.logger.error(`Failed to get current offset for ${path} after write`);
res.status(500).send();
}
});
await new Promise((resolve) => writeStream.on('close', resolve));
return;
}
this.logger.log(`Finished upload to ${path}`);
const checksum = await this.cryptoRepository.hashFile(path);
if (providedChecksum.compare(checksum) !== 0) {
return await this.sendChecksumMismatch(res, id, path);
}
await this.onComplete(metadata);
res.status(200).send({ id });
});
}
@ -156,7 +151,7 @@ export class AssetUploadService extends BaseService {
return;
}
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(res);
return this.sendAlreadyCompleted(res);
}
await this.onCancel(assetId, asset.path);
res.status(204).send();
@ -250,9 +245,8 @@ export class AssetUploadService extends BaseService {
fileCreatedAt: assetData.fileCreatedAt,
fileModifiedAt: assetData.fileModifiedAt,
localDateTime: assetData.fileCreatedAt,
type: type,
type,
isFavorite: assetData.isFavorite,
duration: assetData.duration || null,
visibility: AssetVisibility.Hidden,
originalFileName: assetData.filename,
status: AssetStatus.Partial,
@ -280,7 +274,7 @@ export class AssetUploadService extends BaseService {
async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) {
this.logger.debug('Completing upload for asset', id);
const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const;
const jobData = { name: JobName.AssetExtractMetadata, data: { id, source: 'upload' } } as const;
await withRetry(() => this.assetRepository.setComplete(id));
try {
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
@ -317,9 +311,10 @@ export class AssetUploadService extends BaseService {
if (receivedLength + data.length > size) {
writeStream.destroy();
req.destroy();
return this.onCancel(id, path).finally(() =>
res.status(400).send('Received more data than specified in content-length'),
);
void this.onCancel(id, path)
.catch((error: any) => this.logger.error(`Failed to remove ${id} after too much data: ${error.message}`))
.finally(() => res.status(400).send('Received more data than specified in content-length'));
return;
}
receivedLength += data.length;
if (!writeStream.write(data)) {
@ -333,9 +328,9 @@ export class AssetUploadService extends BaseService {
return writeStream.end();
}
writeStream.destroy();
this.onCancel(id, path).finally(() =>
res.status(400).send(`Received ${receivedLength} bytes when expecting ${size}`),
);
void this.onCancel(id, path)
.catch((error: any) => this.logger.error(`Failed to remove ${id} after unexpected length: ${error.message}`))
.finally(() => res.status(400).send(`Received ${receivedLength} bytes when expecting ${size}`));
});
return writeStream;
@ -353,21 +348,21 @@ export class AssetUploadService extends BaseService {
}
}
private sendInconsistentLengthProblem(res: Response): void {
private sendInconsistentLength(res: Response): void {
res.status(400).contentType('application/problem+json').send({
type: 'https://iana.org/assignments/http-problem-types#inconsistent-upload-length',
title: 'inconsistent length values for upload',
});
}
private sendAlreadyCompletedProblem(res: Response): void {
private sendAlreadyCompleted(res: Response): void {
res.status(400).contentType('application/problem+json').send({
type: 'https://iana.org/assignments/http-problem-types#completed-upload',
title: 'upload is already completed',
});
}
private sendOffsetMismatchProblem(res: Response, expected: number, actual: number): void {
private sendOffsetMismatch(res: Response, expected: number, actual: number): void {
res.status(409).contentType('application/problem+json').setHeader('Upload-Offset', expected.toString()).send({
type: 'https://iana.org/assignments/http-problem-types#mismatching-upload-offset',
title: 'offset from request does not match offset of resource',
@ -376,7 +371,7 @@ export class AssetUploadService extends BaseService {
});
}
private sendChecksumMismatchResponse(res: Response, assetId: string, path: string): Promise<void> {
private sendChecksumMismatch(res: Response, assetId: string, path: string) {
this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`);
res.status(460).send('File on server does not match provided checksum');
return this.onCancel(assetId, path);