mirror of
https://github.com/immich-app/immich
synced 2025-11-14 17:36:12 +00:00
feat(server): Automatic watching of library folders (#6192)
* feat: initial watch support * allow offline files * chore: ignore query errors when resetting e2e db * revert db query * add savepoint * guard the user query * chore: openapi and db migration * wip * support multiple libraries * fix tests * wip * can now cleanup chokidar watchers * fix unit tests * add library watch queue * add missing init from merge * wip * can now filter file extensions * remove watch api from non job client * Fix e2e test * watch library with updated import path and exclusion pattern * add library watch frontend ui * case sensitive watching extensions * can auto watch libraries * move watcher e2e tests to separate file * don't watch libraries from a queue * use event emitters * shorten e2e test timeout * refactor chokidar code to filesystem provider * expose chokidar parameters to config file * fix storage mock * set default config for library watching * add fs provider mocks * cleanup * add more unit tests for watcher * chore: fix format + sql * add more tests * move unwatch feature back to library service * add file event unit tests * chore: formatting * add documentation * fix e2e tests * chore: fix e2e tests * fix library updating * test cleanup * fix typo * cleanup * fixing as per pr comments * reduce library watch config file * update storage config and mocks * move negative event tests to unit tests * fix library watcher e2e * make watch configuration global * remove the feature flag * refactor watcher teardown * fix microservices init * centralize asset scan job queue * improve docs * add more tests * chore: open api * initialize app service * fix docs * fix library watch feature flag * Update docs/docs/features/libraries.md Co-authored-by: Daniel Dietzler <36593685+danieldietzler@users.noreply.github.com> * fix: import right app service * don't be truthy * fix test speling * stricter library update tests * move fs watcher mock to external file * subscribe to config changes * docker does not need polling * make library watch() private * feat: add configuration ui --------- Co-authored-by: Daniel Dietzler <36593685+danieldietzler@users.noreply.github.com> Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
This commit is contained in:
parent
4079e92bbf
commit
068e703e88
48 changed files with 1613 additions and 113 deletions
|
|
@ -1,5 +1,7 @@
|
|||
import { AssetType, LibraryType } from '@app/infra/entities';
|
||||
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
|
||||
import picomatch from 'picomatch';
|
||||
|
||||
import { R_OK } from 'node:constants';
|
||||
import { Stats } from 'node:fs';
|
||||
import path from 'node:path';
|
||||
|
|
@ -11,6 +13,7 @@ import { usePagination, validateCronExpression } from '../domain.util';
|
|||
import { IBaseJob, IEntityJob, ILibraryFileJob, ILibraryRefreshJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job';
|
||||
|
||||
import { ImmichLogger } from '@app/infra/logger';
|
||||
import { EventEmitter } from 'events';
|
||||
import {
|
||||
IAccessRepository,
|
||||
IAssetRepository,
|
||||
|
|
@ -33,11 +36,15 @@ import {
|
|||
} from './library.dto';
|
||||
|
||||
@Injectable()
|
||||
export class LibraryService {
|
||||
export class LibraryService extends EventEmitter {
|
||||
readonly logger = new ImmichLogger(LibraryService.name);
|
||||
private access: AccessCore;
|
||||
private configCore: SystemConfigCore;
|
||||
|
||||
private watchLibraries = false;
|
||||
|
||||
private watchers: Record<string, () => Promise<void>> = {};
|
||||
|
||||
constructor(
|
||||
@Inject(IAccessRepository) accessRepository: IAccessRepository,
|
||||
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
|
||||
|
|
@ -48,6 +55,7 @@ export class LibraryService {
|
|||
@Inject(IStorageRepository) private storageRepository: IStorageRepository,
|
||||
@Inject(IUserRepository) private userRepository: IUserRepository,
|
||||
) {
|
||||
super();
|
||||
this.access = AccessCore.create(accessRepository);
|
||||
this.configCore = SystemConfigCore.create(configRepository);
|
||||
this.configCore.addValidator((config) => {
|
||||
|
|
@ -59,6 +67,7 @@ export class LibraryService {
|
|||
|
||||
async init() {
|
||||
const config = await this.configCore.getConfig();
|
||||
this.watchLibraries = config.library.watch.enabled;
|
||||
this.jobRepository.addCronJob(
|
||||
'libraryScan',
|
||||
config.library.scan.cronExpression,
|
||||
|
|
@ -66,11 +75,128 @@ export class LibraryService {
|
|||
config.library.scan.enabled,
|
||||
);
|
||||
|
||||
this.configCore.config$.subscribe((config) => {
|
||||
if (this.watchLibraries) {
|
||||
await this.watchAll();
|
||||
}
|
||||
|
||||
this.configCore.config$.subscribe(async (config) => {
|
||||
this.jobRepository.updateCronJob('libraryScan', config.library.scan.cronExpression, config.library.scan.enabled);
|
||||
|
||||
if (config.library.watch.enabled !== this.watchLibraries) {
|
||||
this.watchLibraries = config.library.watch.enabled;
|
||||
if (this.watchLibraries) {
|
||||
await this.watchAll();
|
||||
} else {
|
||||
await this.unwatchAll();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async watch(id: string): Promise<boolean> {
|
||||
if (!this.watchLibraries) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const library = await this.findOrFail(id);
|
||||
|
||||
if (library.type !== LibraryType.EXTERNAL) {
|
||||
throw new BadRequestException('Can only watch external libraries');
|
||||
} else if (library.importPaths.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
await this.unwatch(id);
|
||||
|
||||
this.logger.log(`Starting to watch library ${library.id} with import path(s) ${library.importPaths}`);
|
||||
|
||||
const matcher = picomatch(`**/*{${mimeTypes.getSupportedFileExtensions().join(',')}}`, {
|
||||
nocase: true,
|
||||
ignore: library.exclusionPatterns,
|
||||
});
|
||||
|
||||
const config = await this.configCore.getConfig();
|
||||
|
||||
this.logger.debug(
|
||||
`Settings for watcher: usePolling: ${config.library.watch.usePolling}, interval: ${config.library.watch.interval}`,
|
||||
);
|
||||
|
||||
const watcher = this.storageRepository.watch(library.importPaths, {
|
||||
usePolling: config.library.watch.usePolling,
|
||||
interval: config.library.watch.interval,
|
||||
binaryInterval: config.library.watch.interval,
|
||||
ignoreInitial: true,
|
||||
});
|
||||
|
||||
this.watchers[id] = async () => {
|
||||
await watcher.close();
|
||||
};
|
||||
|
||||
watcher.on('add', async (path) => {
|
||||
this.logger.debug(`File add event received for ${path} in library ${library.id}}`);
|
||||
if (matcher(path)) {
|
||||
await this.scanAssets(library.id, [path], library.ownerId, false);
|
||||
}
|
||||
this.emit('add', path);
|
||||
});
|
||||
|
||||
watcher.on('change', async (path) => {
|
||||
this.logger.debug(`Detected file change for ${path} in library ${library.id}`);
|
||||
|
||||
if (matcher(path)) {
|
||||
// Note: if the changed file was not previously imported, it will be imported now.
|
||||
await this.scanAssets(library.id, [path], library.ownerId, false);
|
||||
}
|
||||
this.emit('change', path);
|
||||
});
|
||||
|
||||
watcher.on('unlink', async (path) => {
|
||||
this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`);
|
||||
const existingAssetEntity = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
|
||||
|
||||
if (existingAssetEntity && matcher(path)) {
|
||||
await this.assetRepository.save({ id: existingAssetEntity.id, isOffline: true });
|
||||
}
|
||||
|
||||
this.emit('unlink', path);
|
||||
});
|
||||
|
||||
watcher.on('error', async (error) => {
|
||||
// TODO: should we log, or throw an exception?
|
||||
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
|
||||
});
|
||||
|
||||
// Wait for the watcher to initialize before returning
|
||||
await new Promise<void>((resolve) => {
|
||||
watcher.on('ready', async () => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
async unwatch(id: string) {
|
||||
if (this.watchers.hasOwnProperty(id)) {
|
||||
await this.watchers[id]();
|
||||
delete this.watchers[id];
|
||||
}
|
||||
}
|
||||
|
||||
async unwatchAll() {
|
||||
for (const id in this.watchers) {
|
||||
await this.unwatch(id);
|
||||
}
|
||||
}
|
||||
|
||||
async watchAll() {
|
||||
const libraries = await this.repository.getAll(false, LibraryType.EXTERNAL);
|
||||
|
||||
for (const library of libraries) {
|
||||
await this.watch(library.id);
|
||||
}
|
||||
}
|
||||
|
||||
async getStatistics(auth: AuthDto, id: string): Promise<LibraryStatsResponseDto> {
|
||||
await this.access.requirePermission(auth, Permission.LIBRARY_READ, id);
|
||||
return this.repository.getStatistics(id);
|
||||
|
|
@ -117,6 +243,9 @@ export class LibraryService {
|
|||
if (dto.exclusionPatterns && dto.exclusionPatterns.length > 0) {
|
||||
throw new BadRequestException('Upload libraries cannot have exclusion patterns');
|
||||
}
|
||||
if (dto.isWatched) {
|
||||
throw new BadRequestException('Upload libraries cannot be watched');
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
@ -129,12 +258,38 @@ export class LibraryService {
|
|||
isVisible: dto.isVisible ?? true,
|
||||
});
|
||||
|
||||
this.logger.log(`Creating ${dto.type} library for user ${auth.user.name}`);
|
||||
|
||||
if (dto.type === LibraryType.EXTERNAL && this.watchLibraries) {
|
||||
await this.watch(library.id);
|
||||
}
|
||||
|
||||
return mapLibrary(library);
|
||||
}
|
||||
|
||||
private async scanAssets(libraryId: string, assetPaths: string[], ownerId: string, force = false) {
|
||||
await this.jobRepository.queueAll(
|
||||
assetPaths.map((assetPath) => ({
|
||||
name: JobName.LIBRARY_SCAN_ASSET,
|
||||
data: {
|
||||
id: libraryId,
|
||||
assetPath: path.normalize(assetPath),
|
||||
ownerId,
|
||||
force,
|
||||
},
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
async update(auth: AuthDto, id: string, dto: UpdateLibraryDto): Promise<LibraryResponseDto> {
|
||||
await this.access.requirePermission(auth, Permission.LIBRARY_UPDATE, id);
|
||||
const library = await this.repository.update({ id, ...dto });
|
||||
|
||||
if (dto.importPaths || dto.exclusionPatterns) {
|
||||
// Re-watch library to use new paths and/or exclusion patterns
|
||||
await this.watch(id);
|
||||
}
|
||||
|
||||
return mapLibrary(library);
|
||||
}
|
||||
|
||||
|
|
@ -147,6 +302,10 @@ export class LibraryService {
|
|||
throw new BadRequestException('Cannot delete the last upload library');
|
||||
}
|
||||
|
||||
if (this.watchLibraries) {
|
||||
await this.unwatch(id);
|
||||
}
|
||||
|
||||
await this.repository.softDelete(id);
|
||||
await this.jobRepository.queue({ name: JobName.LIBRARY_DELETE, data: { id } });
|
||||
}
|
||||
|
|
@ -245,8 +404,6 @@ export class LibraryService {
|
|||
|
||||
const deviceAssetId = `${basename(assetPath)}`.replace(/\s+/g, '');
|
||||
|
||||
const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`);
|
||||
|
||||
let assetId;
|
||||
if (doImport) {
|
||||
const library = await this.repository.get(job.id, true);
|
||||
|
|
@ -255,6 +412,8 @@ export class LibraryService {
|
|||
return false;
|
||||
}
|
||||
|
||||
const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`);
|
||||
|
||||
// TODO: In wait of refactoring the domain asset service, this function is just manually written like this
|
||||
const addedAsset = await this.assetRepository.create({
|
||||
ownerId: job.ownerId,
|
||||
|
|
@ -387,7 +546,7 @@ export class LibraryService {
|
|||
assetPath.match(new RegExp(`^${user.externalPath}`)),
|
||||
);
|
||||
|
||||
this.logger.debug(`Found ${crawledAssetPaths.length} assets when crawling import paths ${library.importPaths}`);
|
||||
this.logger.debug(`Found ${crawledAssetPaths.length} asset(s) when crawling import paths ${library.importPaths}`);
|
||||
const assetsInLibrary = await this.assetRepository.getByLibraryId([job.id]);
|
||||
const onlineFiles = new Set(crawledAssetPaths);
|
||||
const offlineAssetIds = assetsInLibrary
|
||||
|
|
@ -411,17 +570,7 @@ export class LibraryService {
|
|||
this.logger.debug(`Will import ${filteredPaths.length} new asset(s)`);
|
||||
}
|
||||
|
||||
await this.jobRepository.queueAll(
|
||||
filteredPaths.map((assetPath) => ({
|
||||
name: JobName.LIBRARY_SCAN_ASSET,
|
||||
data: {
|
||||
id: job.id,
|
||||
assetPath: path.normalize(assetPath),
|
||||
ownerId: library.ownerId,
|
||||
force: job.refreshAllFiles ?? false,
|
||||
},
|
||||
})),
|
||||
);
|
||||
await this.scanAssets(job.id, filteredPaths, library.ownerId, job.refreshAllFiles ?? false);
|
||||
}
|
||||
|
||||
await this.repository.update({ id: job.id, refreshedAt: new Date() });
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue