set max-age limit

This commit is contained in:
mertalev 2025-10-10 19:26:22 -04:00
parent da52b3ebf4
commit 0ad983135c
No known key found for this signature in database
GPG key ID: DF6ABC77AAD98C95
8 changed files with 71 additions and 80 deletions

View file

@ -4,6 +4,7 @@ import { DateTime } from 'luxon';
import { createHash } from 'node:crypto';
import { extname, join } from 'node:path';
import { Readable } from 'node:stream';
import { SystemConfig } from 'src/config';
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core';
import { OnEvent, OnJob } from 'src/decorators';
@ -54,6 +55,7 @@ export class AssetUploadService extends BaseService {
async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise<void> {
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
const { isComplete, assetData, uploadLength, contentLength, version } = dto;
const { backup } = await this.getConfig({ withCache: true });
const asset = await this.onStart(auth, dto);
if (asset.isDuplicate) {
@ -63,7 +65,7 @@ export class AssetUploadService extends BaseService {
const location = `/api/upload/${asset.id}`;
if (version <= MAX_RUFH_INTEROP_VERSION) {
this.sendInterimResponse(res, location, version);
this.sendInterimResponse(res, location, version, this.getUploadLimits(backup));
}
// this is a 5xx to indicate the client should do offset retrieval and resume
res.status(500).send('Incomplete asset already exists');
@ -76,29 +78,31 @@ export class AssetUploadService extends BaseService {
const location = `/api/upload/${asset.id}`;
if (version <= MAX_RUFH_INTEROP_VERSION) {
this.sendInterimResponse(res, location, version);
this.sendInterimResponse(res, location, version, this.getUploadLimits(backup));
}
this.addRequest(asset.id, req);
let checksumBuffer: Buffer | undefined;
const writeStream = this.pipe(req, asset.path, contentLength);
if (isComplete) {
const hash = createHash('sha1');
req.on('data', (data: Buffer) => hash.update(data));
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
}
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) {
res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
return;
}
if (dto.checksum.compare(checksumBuffer!) !== 0) {
return await this.sendChecksumMismatch(res, asset.id, asset.path);
}
await this.databaseRepository.withUuidLock(asset.id, async () => {
let checksumBuffer: Buffer | undefined;
const writeStream = this.pipe(req, asset.path, contentLength);
if (isComplete) {
const hash = createHash('sha1');
req.on('data', (data: Buffer) => hash.update(data));
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
}
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) {
res.status(201).set('Location', location).setHeader('Upload-Limit', this.getUploadLimits(backup)).send();
return;
}
if (dto.checksum.compare(checksumBuffer!) !== 0) {
return await this.sendChecksumMismatch(res, asset.id, asset.path);
}
await this.onComplete({ id: asset.id, path: asset.path, fileModifiedAt: assetData.fileModifiedAt });
res.status(200).send({ id: asset.id });
await this.onComplete({ id: asset.id, path: asset.path, fileModifiedAt: assetData.fileModifiedAt });
res.status(200).send({ id: asset.id });
});
}
resumeUpload(auth: AuthDto, req: Readable, res: Response, id: string, dto: ResumeUploadDto): Promise<void> {
@ -180,6 +184,7 @@ export class AssetUploadService extends BaseService {
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
this.logger.verboseFn(() => `Getting upload status for ${id} with version ${version}`);
const { backup } = await this.getConfig({ withCache: true });
this.abortExistingRequest(id);
return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
@ -194,15 +199,20 @@ export class AssetUploadService extends BaseService {
.status(204)
.setHeader('Upload-Offset', offset.toString())
.setHeader('Cache-Control', 'no-store')
.setHeader('Upload-Limit', 'min-size=0')
.setHeader('Upload-Limit', this.getUploadLimits(backup))
.send();
});
}
async getUploadOptions(res: Response): Promise<void> {
const { backup } = await this.getConfig({ withCache: true });
res.status(204).setHeader('Upload-Limit', this.getUploadLimits(backup)).send();
}
@OnJob({ name: JobName.PartialAssetCleanupQueueAll, queue: QueueName.BackgroundTask })
async removeStaleUploads(): Promise<void> {
const config = await this.getConfig({ withCache: false });
const createdBefore = DateTime.now().minus({ hours: config.nightlyTasks.removeStaleUploads.hoursAgo }).toJSDate();
const createdBefore = DateTime.now().minus({ hours: config.backup.upload.maxAgeHours }).toJSDate();
let jobs: JobItem[] = [];
const assets = this.assetJobRepository.streamForPartialAssetCleanupJob(createdBefore);
for await (const asset of assets) {
@ -353,13 +363,13 @@ export class AssetUploadService extends BaseService {
return writeStream;
}
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number, limits: string): void {
if (socket && !socket.destroyed) {
// Express doesn't understand interim responses, so write directly to socket
socket.write(
'HTTP/1.1 104 Upload Resumption Supported\r\n' +
`Location: ${location}\r\n` +
'Upload-Limit: min-size=0\r\n' +
`Upload-Limit: ${limits}\r\n` +
`Upload-Draft-Interop-Version: ${interopVersion}\r\n\r\n`,
);
}
@ -428,4 +438,8 @@ export class AssetUploadService extends BaseService {
res.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1');
}
}
private getUploadLimits({ upload }: SystemConfig['backup']) {
return `min-size=0, max-age=${upload.maxAgeHours * 3600}`;
}
}