feat(server): lower library scan memory usage (#7939)

* use trie

* update tests

* formatting

* pr feedback

* linting
This commit is contained in:
Mert 2024-03-14 01:52:30 -04:00 committed by GitHub
parent 63d252b603
commit d67cc00e4e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 143 additions and 62 deletions

View file

@ -1,6 +1,7 @@
import { AssetType, LibraryType } from '@app/infra/entities';
import { AssetType, LibraryEntity, LibraryType } from '@app/infra/entities';
import { ImmichLogger } from '@app/infra/logger';
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { Trie } from 'mnemonist';
import { R_OK } from 'node:constants';
import { EventEmitter } from 'node:events';
import { Stats } from 'node:fs';
@ -11,7 +12,6 @@ import { AuthDto } from '../auth';
import { mimeTypes } from '../domain.constant';
import { handlePromiseError, usePagination, validateCronExpression } from '../domain.util';
import { IBaseJob, IEntityJob, ILibraryFileJob, ILibraryRefreshJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job';
import {
DatabaseLock,
IAccessRepository,
@ -39,6 +39,8 @@ import {
mapLibrary,
} from './library.dto';
const LIBRARY_SCAN_BATCH_SIZE = 5000;
@Injectable()
export class LibraryService extends EventEmitter {
readonly logger = new ImmichLogger(LibraryService.name);
@ -626,6 +628,69 @@ export class LibraryService extends EventEmitter {
this.logger.verbose(`Refreshing library: ${job.id}`);
const crawledAssetPaths = await this.getPathTrie(library);
this.logger.debug(`Found ${crawledAssetPaths.size} asset(s) when crawling import paths ${library.importPaths}`);
const assetIdsToMarkOffline = [];
const assetIdsToMarkOnline = [];
const pagination = usePagination(LIBRARY_SCAN_BATCH_SIZE, (pagination) =>
this.assetRepository.getLibraryAssetPaths(pagination, library.id),
);
const shouldScanAll = job.refreshAllFiles || job.refreshModifiedFiles;
for await (const page of pagination) {
for (const asset of page) {
const isOffline = !crawledAssetPaths.has(asset.originalPath);
if (isOffline && !asset.isOffline) {
assetIdsToMarkOffline.push(asset.id);
}
if (!isOffline && asset.isOffline) {
assetIdsToMarkOnline.push(asset.id);
}
if (!shouldScanAll) {
crawledAssetPaths.delete(asset.originalPath);
}
}
}
if (assetIdsToMarkOffline.length > 0) {
this.logger.debug(`Found ${assetIdsToMarkOffline.length} offline asset(s) previously marked as online`);
await this.assetRepository.updateAll(assetIdsToMarkOffline, { isOffline: true });
}
if (assetIdsToMarkOnline.length > 0) {
this.logger.debug(`Found ${assetIdsToMarkOnline.length} online asset(s) previously marked as offline`);
await this.assetRepository.updateAll(assetIdsToMarkOnline, { isOffline: false });
}
if (crawledAssetPaths.size > 0) {
if (!shouldScanAll) {
this.logger.debug(`Will import ${crawledAssetPaths.size} new asset(s)`);
}
const batch = [];
for (const assetPath of crawledAssetPaths) {
batch.push(assetPath);
if (batch.length >= LIBRARY_SCAN_BATCH_SIZE) {
await this.scanAssets(job.id, batch, library.ownerId, job.refreshAllFiles ?? false);
batch.length = 0;
}
}
if (batch.length > 0) {
await this.scanAssets(job.id, batch, library.ownerId, job.refreshAllFiles ?? false);
}
}
await this.repository.update({ id: job.id, refreshedAt: new Date() });
return true;
}
private async getPathTrie(library: LibraryEntity): Promise<Trie<string>> {
const pathValidation = await Promise.all(
library.importPaths.map(async (importPath) => await this.validateImportPath(importPath)),
);
@ -640,61 +705,17 @@ export class LibraryService extends EventEmitter {
.filter((validation) => validation.isValid)
.map((validation) => validation.importPath);
let rawPaths = await this.storageRepository.crawl({
const generator = this.storageRepository.walk({
pathsToCrawl: validImportPaths,
exclusionPatterns: library.exclusionPatterns,
});
const crawledAssetPaths = new Set<string>(rawPaths);
const shouldScanAll = job.refreshAllFiles || job.refreshModifiedFiles;
let pathsToScan: string[] = shouldScanAll ? rawPaths : [];
rawPaths = [];
this.logger.debug(`Found ${crawledAssetPaths.size} asset(s) when crawling import paths ${library.importPaths}`);
const assetIdsToMarkOffline = [];
const assetIdsToMarkOnline = [];
const pagination = usePagination(5000, (pagination) =>
this.assetRepository.getLibraryAssetPaths(pagination, library.id),
);
for await (const page of pagination) {
for (const asset of page) {
const isOffline = !crawledAssetPaths.has(asset.originalPath);
if (isOffline && !asset.isOffline) {
assetIdsToMarkOffline.push(asset.id);
}
if (!isOffline && asset.isOffline) {
assetIdsToMarkOnline.push(asset.id);
}
crawledAssetPaths.delete(asset.originalPath);
}
const trie = new Trie<string>();
for await (const filePath of generator) {
trie.add(filePath);
}
if (assetIdsToMarkOffline.length > 0) {
this.logger.debug(`Found ${assetIdsToMarkOffline.length} offline asset(s) previously marked as online`);
await this.assetRepository.updateAll(assetIdsToMarkOffline, { isOffline: true });
}
if (assetIdsToMarkOnline.length > 0) {
this.logger.debug(`Found ${assetIdsToMarkOnline.length} online asset(s) previously marked as offline`);
await this.assetRepository.updateAll(assetIdsToMarkOnline, { isOffline: false });
}
if (!shouldScanAll) {
pathsToScan = [...crawledAssetPaths];
this.logger.debug(`Will import ${pathsToScan.length} new asset(s)`);
}
if (pathsToScan.length > 0) {
await this.scanAssets(job.id, pathsToScan, library.ownerId, job.refreshAllFiles ?? false);
}
await this.repository.update({ id: job.id, refreshedAt: new Date() });
return true;
return trie;
}
private async findOrFail(id: string) {