mirror of
https://github.com/immich-app/immich
synced 2025-11-14 17:36:12 +00:00
tweaks
shared pipe method shared pipe method require size upfront make length optional for patch requests
This commit is contained in:
parent
fb192bd310
commit
0a955e21b6
4 changed files with 138 additions and 162 deletions
|
|
@ -1,7 +1,8 @@
|
|||
import { BadRequestException, Injectable, InternalServerErrorException } from '@nestjs/common';
|
||||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { Response } from 'express';
|
||||
import { createHash } from 'node:crypto';
|
||||
import { extname, join } from 'node:path';
|
||||
import { Readable } from 'node:stream';
|
||||
import { StorageCore } from 'src/cores/storage.core';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/upload.dto';
|
||||
|
|
@ -19,7 +20,7 @@ export class AssetUploadService extends BaseService {
|
|||
async startUpload(req: AuthenticatedRequest, res: Response, dto: StartUploadDto): Promise<void> {
|
||||
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
|
||||
const { isComplete, assetData, uploadLength, contentLength, version } = dto;
|
||||
if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) {
|
||||
if (isComplete && uploadLength && uploadLength !== contentLength) {
|
||||
return this.sendInconsistentLengthProblem(res);
|
||||
}
|
||||
|
||||
|
|
@ -48,20 +49,22 @@ export class AssetUploadService extends BaseService {
|
|||
fileCreatedAt: assetData.fileCreatedAt,
|
||||
fileModifiedAt: assetData.fileModifiedAt,
|
||||
localDateTime: assetData.fileCreatedAt,
|
||||
type: mimeTypes.assetType(path),
|
||||
type: type,
|
||||
isFavorite: assetData.isFavorite,
|
||||
duration: assetData.duration || null,
|
||||
visibility: assetData.visibility || AssetVisibility.Timeline,
|
||||
originalFileName: assetData.filename,
|
||||
status: AssetStatus.Partial,
|
||||
},
|
||||
uploadLength,
|
||||
assetData.metadata,
|
||||
);
|
||||
} catch (error: any) {
|
||||
if (isAssetChecksumConstraint(error)) {
|
||||
const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(req.auth.user.id, dto.checksum);
|
||||
if (!duplicate) {
|
||||
throw new InternalServerErrorException('Error locating duplicate for checksum constraint');
|
||||
res.status(500).send('Error locating duplicate for checksum constraint');
|
||||
return;
|
||||
}
|
||||
|
||||
if (duplicate.status !== AssetStatus.Partial) {
|
||||
|
|
@ -83,68 +86,31 @@ export class AssetUploadService extends BaseService {
|
|||
|
||||
await this.storageRepository.mkdir(folder);
|
||||
let checksumBuffer: Buffer | undefined;
|
||||
const writeStream = this.storageRepository.createWriteStream(path);
|
||||
const metadata = { id: assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt };
|
||||
const writeStream = this.pipe(req, res, metadata);
|
||||
|
||||
if (isComplete) {
|
||||
const hash = createHash('sha1');
|
||||
req.on('data', (chunk: Buffer) => hash.update(chunk));
|
||||
req.on('data', (data: Buffer) => hash.update(data));
|
||||
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
|
||||
}
|
||||
|
||||
writeStream.on('error', (error) => {
|
||||
this.logger.error(`Failed to write chunk to ${path}: ${error.message}`);
|
||||
if (!res.headersSent) {
|
||||
res.status(500).setHeader('Location', location).send();
|
||||
}
|
||||
});
|
||||
|
||||
writeStream.on('finish', async () => {
|
||||
writeStream.on('finish', () => {
|
||||
this.setCompleteHeader(res, dto.version, isComplete);
|
||||
if (!isComplete) {
|
||||
return res.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||
return res.status(201).send();
|
||||
}
|
||||
this.logger.log(`Finished upload to ${path}`);
|
||||
if (dto.checksum.compare(checksumBuffer!) !== 0) {
|
||||
return this.sendChecksumMismatchResponse(res, assetId, path);
|
||||
}
|
||||
|
||||
try {
|
||||
await this.onComplete({ assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt });
|
||||
} finally {
|
||||
this.setCompleteHeader(res, dto.version, true);
|
||||
res.status(200).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||
}
|
||||
});
|
||||
|
||||
req.on('error', (error) => {
|
||||
this.logger.error(`Failed to read request body: ${error.message}`);
|
||||
writeStream.end();
|
||||
if (!res.headersSent) {
|
||||
res.status(500).setHeader('Location', location).send();
|
||||
}
|
||||
});
|
||||
|
||||
let receivedLength = 0;
|
||||
req.on('data', (chunk: Buffer) => {
|
||||
if (receivedLength + chunk.length > contentLength) {
|
||||
writeStream.destroy();
|
||||
req.destroy();
|
||||
res.status(400).send('Received more data than specified in content-length');
|
||||
return this.onCancel(assetId, path);
|
||||
}
|
||||
receivedLength += chunk.length;
|
||||
if (!writeStream.write(chunk)) {
|
||||
req.pause();
|
||||
writeStream.once('drain', () => req.resume());
|
||||
}
|
||||
});
|
||||
|
||||
req.on('end', () => {
|
||||
if (receivedLength === contentLength) {
|
||||
return writeStream.end();
|
||||
}
|
||||
this.logger.error(`Received ${receivedLength} bytes when expecting ${contentLength} for ${assetId}`);
|
||||
writeStream.destroy();
|
||||
this.onCancel(assetId, path);
|
||||
this.onComplete(metadata)
|
||||
.then(() => res.status(200).send())
|
||||
.catch((error) => {
|
||||
this.logger.error(`Failed to complete upload for ${assetId}: ${error.message}`);
|
||||
res.status(500).send();
|
||||
});
|
||||
});
|
||||
await new Promise((resolve) => writeStream.on('close', resolve));
|
||||
}
|
||||
|
|
@ -152,30 +118,25 @@ export class AssetUploadService extends BaseService {
|
|||
resumeUpload(req: AuthenticatedRequest, res: Response, id: string, dto: ResumeUploadDto): Promise<void> {
|
||||
this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`);
|
||||
const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto;
|
||||
if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) {
|
||||
this.sendInconsistentLengthProblem(res);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
if (version && version >= 6 && req.headers['content-type'] !== 'application/partial-upload') {
|
||||
throw new BadRequestException('Content-Type must be application/partial-upload for PATCH requests');
|
||||
}
|
||||
|
||||
return this.databaseRepository.withUuidLock(id, async () => {
|
||||
const asset = await this.assetRepository.getCompletionMetadata(id, req.auth.user.id);
|
||||
if (!asset) {
|
||||
const completionData = await this.assetRepository.getCompletionMetadata(id, req.auth.user.id);
|
||||
if (!completionData) {
|
||||
res.status(404).send('Asset not found');
|
||||
return;
|
||||
}
|
||||
const { fileModifiedAt, path, status, checksum: providedChecksum, size } = completionData;
|
||||
|
||||
if (asset.status !== AssetStatus.Partial) {
|
||||
if (status !== AssetStatus.Partial) {
|
||||
this.setCompleteHeader(res, version, false);
|
||||
return this.sendAlreadyCompletedProblem(res);
|
||||
}
|
||||
if (uploadOffset === null) {
|
||||
throw new BadRequestException('Missing Upload-Offset header');
|
||||
|
||||
if (uploadLength && size && size !== uploadLength) {
|
||||
this.setCompleteHeader(res, version, false);
|
||||
return this.sendInconsistentLengthProblem(res);
|
||||
}
|
||||
|
||||
const { path } = asset;
|
||||
const expectedOffset = await this.getCurrentOffset(path);
|
||||
if (expectedOffset !== uploadOffset) {
|
||||
this.setCompleteHeader(res, version, false);
|
||||
|
|
@ -183,92 +144,103 @@ export class AssetUploadService extends BaseService {
|
|||
}
|
||||
|
||||
const newLength = uploadOffset + contentLength;
|
||||
|
||||
// If upload length is provided, validate we're not exceeding it
|
||||
if (uploadLength !== undefined && newLength > uploadLength) {
|
||||
this.setCompleteHeader(res, version, false);
|
||||
res.status(400).send('Upload would exceed declared length');
|
||||
return;
|
||||
}
|
||||
|
||||
this.validateQuota(req.auth, newLength);
|
||||
|
||||
// Empty PATCH without Upload-Complete
|
||||
if (contentLength === 0 && !isComplete) {
|
||||
this.setCompleteHeader(res, version, false);
|
||||
res.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send();
|
||||
return;
|
||||
}
|
||||
|
||||
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
|
||||
let receivedLength = 0;
|
||||
|
||||
writeStream.on('error', (error) => {
|
||||
this.logger.error(`Failed to write chunk to ${path}: ${error.message}`);
|
||||
if (!res.headersSent) {
|
||||
res.status(500).send('Failed to write chunk');
|
||||
}
|
||||
});
|
||||
|
||||
const metadata = { id, path, size: contentLength, fileModifiedAt: fileModifiedAt };
|
||||
const writeStream = this.pipe(req, res, metadata);
|
||||
writeStream.on('finish', async () => {
|
||||
this.setCompleteHeader(res, version, isComplete);
|
||||
const currentOffset = await this.getCurrentOffset(path);
|
||||
if (!isComplete) {
|
||||
this.setCompleteHeader(res, version, false);
|
||||
return res.status(204).setHeader('Upload-Offset', currentOffset.toString()).send();
|
||||
}
|
||||
|
||||
this.logger.log(`Finished upload to ${path}`);
|
||||
const checksum = await this.cryptoRepository.hashFile(path);
|
||||
if (asset.checksum.compare(checksum) !== 0) {
|
||||
if (providedChecksum.compare(checksum) !== 0) {
|
||||
return this.sendChecksumMismatchResponse(res, id, path);
|
||||
}
|
||||
|
||||
try {
|
||||
await this.onComplete({ assetId: id, path, size: currentOffset, fileModifiedAt: asset.fileModifiedAt });
|
||||
await this.onComplete(metadata);
|
||||
} finally {
|
||||
this.setCompleteHeader(res, version, true);
|
||||
res.status(200).setHeader('Upload-Offset', currentOffset.toString()).send();
|
||||
res.status(200).send();
|
||||
}
|
||||
});
|
||||
|
||||
req.on('data', (chunk: Buffer) => {
|
||||
if (receivedLength + chunk.length > contentLength) {
|
||||
this.logger.error(`Received more data than specified in content-length for upload to ${path}`);
|
||||
writeStream.destroy();
|
||||
req.destroy();
|
||||
res.status(400).send('Received more data than specified in content-length');
|
||||
return this.onCancel(id, path);
|
||||
}
|
||||
|
||||
receivedLength += chunk.length;
|
||||
if (!writeStream.write(chunk)) {
|
||||
req.pause();
|
||||
writeStream.once('drain', () => req.resume());
|
||||
}
|
||||
});
|
||||
|
||||
req.on('end', () => {
|
||||
if (receivedLength === contentLength) {
|
||||
return writeStream.end();
|
||||
}
|
||||
this.logger.error(`Received ${receivedLength} bytes when expecting ${contentLength} for ${id}`);
|
||||
writeStream.destroy();
|
||||
return this.onCancel(id, path);
|
||||
});
|
||||
await new Promise((resolve) => writeStream.on('close', resolve));
|
||||
});
|
||||
}
|
||||
|
||||
async cancelUpload(auth: AuthDto, assetId: string, response: Response): Promise<void> {
|
||||
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
|
||||
if (!asset) {
|
||||
response.status(404).send('Asset not found');
|
||||
return;
|
||||
}
|
||||
if (asset.status !== AssetStatus.Partial) {
|
||||
return this.sendAlreadyCompletedProblem(response);
|
||||
}
|
||||
await this.onCancel(assetId, asset.path);
|
||||
response.status(204).send();
|
||||
private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) {
|
||||
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
|
||||
writeStream.on('error', (error) => {
|
||||
this.logger.error(`Failed to write chunk to ${path}: ${error.message}`);
|
||||
if (!res.headersSent) {
|
||||
res.status(500).send();
|
||||
}
|
||||
});
|
||||
|
||||
req.on('error', (error) => {
|
||||
this.logger.error(`Failed to read request body: ${error.message}`);
|
||||
if (!res.headersSent) {
|
||||
res.status(500).send();
|
||||
}
|
||||
});
|
||||
|
||||
let receivedLength = 0;
|
||||
req.on('data', (data: Buffer) => {
|
||||
if (receivedLength + data.length > size) {
|
||||
writeStream.destroy();
|
||||
req.destroy();
|
||||
return this.onCancel(id, path).finally(() =>
|
||||
res.status(400).send('Received more data than specified in content-length'),
|
||||
);
|
||||
}
|
||||
receivedLength += data.length;
|
||||
if (!writeStream.write(data)) {
|
||||
req.pause();
|
||||
writeStream.once('drain', () => req.resume());
|
||||
}
|
||||
});
|
||||
|
||||
req.on('end', () => {
|
||||
if (receivedLength === size) {
|
||||
return writeStream.end();
|
||||
}
|
||||
writeStream.destroy();
|
||||
this.onCancel(id, path).finally(() =>
|
||||
res.status(400).send(`Received ${receivedLength} bytes when expecting ${size}`),
|
||||
);
|
||||
});
|
||||
|
||||
return writeStream;
|
||||
}
|
||||
|
||||
cancelUpload(auth: AuthDto, assetId: string, response: Response): Promise<void> {
|
||||
return this.databaseRepository.withUuidLock(assetId, async () => {
|
||||
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
|
||||
if (!asset) {
|
||||
response.status(404).send('Asset not found');
|
||||
return;
|
||||
}
|
||||
if (asset.status !== AssetStatus.Partial) {
|
||||
return this.sendAlreadyCompletedProblem(response);
|
||||
}
|
||||
await this.onCancel(assetId, asset.path);
|
||||
response.status(204).send();
|
||||
});
|
||||
}
|
||||
|
||||
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
|
||||
|
|
@ -280,9 +252,7 @@ export class AssetUploadService extends BaseService {
|
|||
}
|
||||
|
||||
const offset = await this.getCurrentOffset(asset.path);
|
||||
const isComplete = asset.status !== AssetStatus.Partial;
|
||||
|
||||
this.setCompleteHeader(res, version, isComplete);
|
||||
this.setCompleteHeader(res, version, asset.status !== AssetStatus.Partial);
|
||||
res
|
||||
.status(204)
|
||||
.setHeader('Upload-Offset', offset.toString())
|
||||
|
|
@ -296,11 +266,10 @@ export class AssetUploadService extends BaseService {
|
|||
response.status(204).setHeader('Upload-Limit', 'min-size=0').setHeader('Allow', 'POST, OPTIONS').send();
|
||||
}
|
||||
|
||||
private async onComplete(data: { assetId: string; path: string; size: number; fileModifiedAt: Date }): Promise<void> {
|
||||
const { assetId, path, size, fileModifiedAt } = data;
|
||||
this.logger.debug('Completing upload for asset', assetId);
|
||||
const jobData = { name: JobName.AssetExtractMetadata, data: { id: assetId, source: 'upload' } } as const;
|
||||
await withRetry(() => this.assetRepository.setCompleteWithSize(assetId, size));
|
||||
private async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) {
|
||||
this.logger.debug('Completing upload for asset', id);
|
||||
const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const;
|
||||
await withRetry(() => this.assetRepository.setCompleteWithSize(id));
|
||||
try {
|
||||
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
|
||||
} catch (error: any) {
|
||||
|
|
@ -321,6 +290,7 @@ export class AssetUploadService extends BaseService {
|
|||
socket.write(
|
||||
'HTTP/1.1 104 Upload Resumption Supported\r\n' +
|
||||
`Location: ${location}\r\n` +
|
||||
`Upload-Limit: min-size=0\r\n` +
|
||||
`Upload-Draft-Interop-Version: ${interopVersion}\r\n\r\n`,
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue