refactor(server): library syncing (#12220)

* refactor: library scanning

fix tests

remove offline files step

cleanup library service

improve tests

cleanup tests

add db migration

fix e2e

cleanup openapi

fix tests

fix tests

update docs

update docs

update mobile code

fix formatting

don't remove assets from library with invalid import path

use trash for offline files

add migration

simplify scan endpoint

cleanup library panel

fix library tests

e2e lint

fix e2e

trash e2e

fix lint

add asset trash tests

add more tests

ensure thumbs are generated

cleanup svelte

cleanup queue names

fix tests

fix lint

add warning due to trash

fix trash tests

fix lint

fix tests

Admin message for offline asset

fix comments

Update web/src/lib/components/asset-viewer/asset-viewer-nav-bar.svelte

Co-authored-by: Daniel Dietzler <36593685+danieldietzler@users.noreply.github.com>

add permission to library scan endpoint

revert asset interface sort

add trash reason to shared link stub

improve path view in offline

update docs

improve trash performance

fix comments

remove stray comment

* refactor: add back isOffline and remove trashReason from asset, change sync job flow

* chore(server): drop coverage to 80% for functions

* chore: rebase and generated files

---------

Co-authored-by: Zack Pollard <zackpollard@ymail.com>
This commit is contained in:
Jonathan Jogenfors 2024-09-25 19:26:19 +02:00 committed by GitHub
parent 1ef2834603
commit b2f2be3485
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
85 changed files with 941 additions and 1926 deletions

View file

@ -1,6 +1,5 @@
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { R_OK } from 'node:constants';
import { Stats } from 'node:fs';
import path, { basename, parse } from 'node:path';
import picomatch from 'picomatch';
import { StorageCore } from 'src/cores/storage.core';
@ -10,27 +9,26 @@ import {
CreateLibraryDto,
LibraryResponseDto,
LibraryStatsResponseDto,
ScanLibraryDto,
mapLibrary,
UpdateLibraryDto,
ValidateLibraryDto,
ValidateLibraryImportPathResponseDto,
ValidateLibraryResponseDto,
mapLibrary,
} from 'src/dtos/library.dto';
import { AssetEntity } from 'src/entities/asset.entity';
import { LibraryEntity } from 'src/entities/library.entity';
import { AssetType } from 'src/enum';
import { IAssetRepository, WithProperty } from 'src/interfaces/asset.interface';
import { IAssetRepository } from 'src/interfaces/asset.interface';
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface';
import { ArgOf } from 'src/interfaces/event.interface';
import {
IBaseJob,
IEntityJob,
IJobRepository,
ILibraryAssetJob,
ILibraryFileJob,
ILibraryOfflineJob,
ILibraryRefreshJob,
JOBS_LIBRARY_PAGINATION_SIZE,
JobName,
JOBS_LIBRARY_PAGINATION_SIZE,
JobStatus,
} from 'src/interfaces/job.interface';
import { ILibraryRepository } from 'src/interfaces/library.interface';
@ -78,11 +76,7 @@ export class LibraryService {
this.jobRepository.addCronJob(
'libraryScan',
scan.cronExpression,
() =>
handlePromiseError(
this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SCAN_ALL, data: { force: false } }),
this.logger,
),
() => handlePromiseError(this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ALL }), this.logger),
scan.enabled,
);
@ -143,7 +137,7 @@ export class LibraryService {
const handler = async () => {
this.logger.debug(`File add event received for ${path} in library ${library.id}}`);
if (matcher(path)) {
await this.scanAssets(library.id, [path], library.ownerId, false);
await this.syncFiles(library, [path]);
}
};
return handlePromiseError(handler(), this.logger);
@ -151,9 +145,13 @@ export class LibraryService {
onChange: (path) => {
const handler = async () => {
this.logger.debug(`Detected file change for ${path} in library ${library.id}`);
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset) {
await this.syncAssets(library, [asset.id]);
}
if (matcher(path)) {
// Note: if the changed file was not previously imported, it will be imported now.
await this.scanAssets(library.id, [path], library.ownerId, false);
await this.syncFiles(library, [path]);
}
};
return handlePromiseError(handler(), this.logger);
@ -162,8 +160,8 @@ export class LibraryService {
const handler = async () => {
this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`);
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset && matcher(path)) {
await this.assetRepository.update({ id: asset.id, isOffline: true });
if (asset) {
await this.syncAssets(library, [asset.id]);
}
};
return handlePromiseError(handler(), this.logger);
@ -216,7 +214,7 @@ export class LibraryService {
async getStatistics(id: string): Promise<LibraryStatsResponseDto> {
const statistics = await this.repository.getStatistics(id);
if (!statistics) {
throw new BadRequestException('Library not found');
throw new BadRequestException(`Library ${id} not found`);
}
return statistics;
}
@ -250,20 +248,28 @@ export class LibraryService {
return mapLibrary(library);
}
private async scanAssets(libraryId: string, assetPaths: string[], ownerId: string, force = false) {
private async syncFiles({ id, ownerId }: LibraryEntity, assetPaths: string[]) {
await this.jobRepository.queueAll(
assetPaths.map((assetPath) => ({
name: JobName.LIBRARY_SCAN_ASSET,
name: JobName.LIBRARY_SYNC_FILE,
data: {
id: libraryId,
id,
assetPath,
ownerId,
force,
},
})),
);
}
private async syncAssets({ importPaths, exclusionPatterns }: LibraryEntity, assetIds: string[]) {
await this.jobRepository.queueAll(
assetIds.map((assetId) => ({
name: JobName.LIBRARY_SYNC_ASSET,
data: { id: assetId, importPaths, exclusionPatterns },
})),
);
}
private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
const validation = new ValidateLibraryImportPathResponseDto();
validation.importPath = importPath;
@ -366,258 +372,182 @@ export class LibraryService {
return JobStatus.SUCCESS;
}
async handleAssetRefresh(job: ILibraryFileJob): Promise<JobStatus> {
async handleSyncFile(job: ILibraryFileJob): Promise<JobStatus> {
// Only needs to handle new assets
const assetPath = path.normalize(job.assetPath);
const existingAssetEntity = await this.assetRepository.getByLibraryIdAndOriginalPath(job.id, assetPath);
let stats: Stats;
try {
stats = await this.storageRepository.stat(assetPath);
} catch (error: Error | any) {
// Can't access file, probably offline
if (existingAssetEntity) {
// Mark asset as offline
this.logger.debug(`Marking asset as offline: ${assetPath}`);
await this.assetRepository.update({ id: existingAssetEntity.id, isOffline: true });
return JobStatus.SUCCESS;
} else {
// File can't be accessed and does not already exist in db
throw new BadRequestException('Cannot access file', { cause: error });
}
}
let doImport = false;
let doRefresh = false;
if (job.force) {
doRefresh = true;
}
const originalFileName = parse(assetPath).base;
if (!existingAssetEntity) {
// This asset is new to us, read it from disk
this.logger.debug(`Importing new asset: ${assetPath}`);
doImport = true;
} else if (stats.mtime.toISOString() !== existingAssetEntity.fileModifiedAt.toISOString()) {
// File modification time has changed since last time we checked, re-read from disk
this.logger.debug(
`File modification time has changed, re-importing asset: ${assetPath}. Old mtime: ${existingAssetEntity.fileModifiedAt}. New mtime: ${stats.mtime}`,
);
doRefresh = true;
} else if (existingAssetEntity.originalFileName !== originalFileName) {
// TODO: We can likely remove this check in the second half of 2024 when all assets have likely been re-imported by all users
this.logger.debug(
`Asset is missing file extension, re-importing: ${assetPath}. Current incorrect filename: ${existingAssetEntity.originalFileName}.`,
);
doRefresh = true;
} else if (!job.force && stats && !existingAssetEntity.isOffline) {
// Asset exists on disk and in db and mtime has not changed. Also, we are not forcing refresn. Therefore, do nothing
this.logger.debug(`Asset already exists in database and on disk, will not import: ${assetPath}`);
}
if (stats && existingAssetEntity?.isOffline) {
// File was previously offline but is now online
this.logger.debug(`Marking previously-offline asset as online: ${assetPath}`);
await this.assetRepository.update({ id: existingAssetEntity.id, isOffline: false });
doRefresh = true;
}
if (!doImport && !doRefresh) {
// If we don't import, exit here
let asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.id, assetPath);
if (asset) {
return JobStatus.SKIPPED;
}
let assetType: AssetType;
if (mimeTypes.isImage(assetPath)) {
assetType = AssetType.IMAGE;
} else if (mimeTypes.isVideo(assetPath)) {
assetType = AssetType.VIDEO;
} else {
throw new BadRequestException(`Unsupported file type ${assetPath}`);
let stat;
try {
stat = await this.storageRepository.stat(assetPath);
} catch (error: any) {
if (error.code === 'ENOENT') {
this.logger.error(`File not found: ${assetPath}`);
return JobStatus.SKIPPED;
}
this.logger.error(`Error reading file: ${assetPath}. Error: ${error}`);
return JobStatus.FAILED;
}
this.logger.log(`Importing new library asset: ${assetPath}`);
const library = await this.repository.get(job.id, true);
if (!library || library.deletedAt) {
this.logger.error('Cannot import asset into deleted library');
return JobStatus.FAILED;
}
// TODO: device asset id is deprecated, remove it
const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, '');
const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`);
// TODO: doesn't xmp replace the file extension? Will need investigation
let sidecarPath: string | null = null;
if (await this.storageRepository.checkFileExists(`${assetPath}.xmp`, R_OK)) {
sidecarPath = `${assetPath}.xmp`;
}
// TODO: device asset id is deprecated, remove it
const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, '');
const assetType = mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE;
let assetId;
if (doImport) {
const library = await this.repository.get(job.id, true);
if (library?.deletedAt) {
this.logger.error('Cannot import asset into deleted library');
return JobStatus.FAILED;
}
const mtime = stat.mtime;
const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`);
asset = await this.assetRepository.create({
ownerId: job.ownerId,
libraryId: job.id,
checksum: pathHash,
originalPath: assetPath,
deviceAssetId,
deviceId: 'Library Import',
fileCreatedAt: mtime,
fileModifiedAt: mtime,
localDateTime: mtime,
type: assetType,
originalFileName: parse(assetPath).base,
// TODO: In wait of refactoring the domain asset service, this function is just manually written like this
const addedAsset = await this.assetRepository.create({
ownerId: job.ownerId,
libraryId: job.id,
checksum: pathHash,
originalPath: assetPath,
deviceAssetId,
deviceId: 'Library Import',
fileCreatedAt: stats.mtime,
fileModifiedAt: stats.mtime,
localDateTime: stats.mtime,
type: assetType,
originalFileName,
sidecarPath,
isExternal: true,
});
assetId = addedAsset.id;
} else if (doRefresh && existingAssetEntity) {
assetId = existingAssetEntity.id;
await this.assetRepository.updateAll([existingAssetEntity.id], {
fileCreatedAt: stats.mtime,
fileModifiedAt: stats.mtime,
originalFileName,
});
} else {
// Not importing and not refreshing, do nothing
return JobStatus.SKIPPED;
}
sidecarPath,
isExternal: true,
});
this.logger.debug(`Queueing metadata extraction for: ${assetPath}`);
await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: assetId, source: 'upload' } });
if (assetType === AssetType.VIDEO) {
await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: assetId } });
}
await this.queuePostSyncJobs(asset);
return JobStatus.SUCCESS;
}
async queueScan(id: string, dto: ScanLibraryDto) {
async queuePostSyncJobs(asset: AssetEntity) {
this.logger.debug(`Queueing metadata extraction for: ${asset.originalPath}`);
await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id, source: 'upload' } });
if (asset.type === AssetType.VIDEO) {
await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.id } });
}
}
async queueScan(id: string) {
await this.findOrFail(id);
await this.jobRepository.queue({
name: JobName.LIBRARY_SCAN,
name: JobName.LIBRARY_QUEUE_SYNC_FILES,
data: {
id,
refreshModifiedFiles: dto.refreshModifiedFiles ?? false,
refreshAllFiles: dto.refreshAllFiles ?? false,
},
});
await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SYNC_ASSETS, data: { id } });
}
async queueRemoveOffline(id: string) {
this.logger.verbose(`Queueing offline file removal from library ${id}`);
await this.jobRepository.queue({ name: JobName.LIBRARY_REMOVE_OFFLINE, data: { id } });
}
async handleQueueAllScan(job: IBaseJob): Promise<JobStatus> {
this.logger.debug(`Refreshing all external libraries: force=${job.force}`);
async handleQueueSyncAll(): Promise<JobStatus> {
this.logger.debug(`Refreshing all external libraries`);
await this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_CLEANUP, data: {} });
const libraries = await this.repository.getAll(true);
await this.jobRepository.queueAll(
libraries.map((library) => ({
name: JobName.LIBRARY_SCAN,
name: JobName.LIBRARY_QUEUE_SYNC_FILES,
data: {
id: library.id,
},
})),
);
await this.jobRepository.queueAll(
libraries.map((library) => ({
name: JobName.LIBRARY_QUEUE_SYNC_ASSETS,
data: {
id: library.id,
refreshModifiedFiles: !job.force,
refreshAllFiles: job.force ?? false,
},
})),
);
return JobStatus.SUCCESS;
}
async handleOfflineCheck(job: ILibraryOfflineJob): Promise<JobStatus> {
async handleSyncAsset(job: ILibraryAssetJob): Promise<JobStatus> {
const asset = await this.assetRepository.getById(job.id);
if (!asset) {
// Asset is no longer in the database, skip
return JobStatus.SKIPPED;
}
if (asset.isOffline) {
this.logger.verbose(`Asset is already offline: ${asset.originalPath}`);
return JobStatus.SUCCESS;
}
const markOffline = async (explanation: string) => {
if (!asset.isOffline) {
this.logger.debug(`${explanation}, removing: ${asset.originalPath}`);
await this.assetRepository.updateAll([asset.id], { isOffline: true, deletedAt: new Date() });
}
};
const isInPath = job.importPaths.find((path) => asset.originalPath.startsWith(path));
if (!isInPath) {
this.logger.debug(`Asset is no longer in an import path, marking offline: ${asset.originalPath}`);
await this.assetRepository.update({ id: asset.id, isOffline: true });
await markOffline('Asset is no longer in an import path');
return JobStatus.SUCCESS;
}
const isExcluded = job.exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern));
if (isExcluded) {
this.logger.debug(`Asset is covered by an exclusion pattern, marking offline: ${asset.originalPath}`);
await this.assetRepository.update({ id: asset.id, isOffline: true });
await markOffline('Asset is covered by an exclusion pattern');
return JobStatus.SUCCESS;
}
const fileExists = await this.storageRepository.checkFileExists(asset.originalPath, R_OK);
if (!fileExists) {
this.logger.debug(`Asset is no longer found on disk, marking offline: ${asset.originalPath}`);
await this.assetRepository.update({ id: asset.id, isOffline: true });
let stat;
try {
stat = await this.storageRepository.stat(asset.originalPath);
} catch {
await markOffline('Asset is no longer on disk or is inaccessible because of permissions');
return JobStatus.SUCCESS;
}
this.logger.verbose(
`Asset is found on disk, not covered by an exclusion pattern, and is in an import path, keeping online: ${asset.originalPath}`,
);
const mtime = stat.mtime;
const isAssetModified = mtime.toISOString() !== asset.fileModifiedAt.toISOString();
if (asset.isOffline || isAssetModified) {
this.logger.debug(`Asset was offline or modified, updating asset record ${asset.originalPath}`);
//TODO: When we have asset status, we need to leave deletedAt as is when status is trashed
await this.assetRepository.updateAll([asset.id], {
isOffline: false,
deletedAt: null,
fileCreatedAt: mtime,
fileModifiedAt: mtime,
originalFileName: parse(asset.originalPath).base,
});
}
if (isAssetModified) {
this.logger.debug(`Asset was modified, queuing metadata extraction for: ${asset.originalPath}`);
await this.queuePostSyncJobs(asset);
}
return JobStatus.SUCCESS;
}
async handleRemoveOffline(job: IEntityJob): Promise<JobStatus> {
this.logger.debug(`Removing offline assets for library ${job.id}`);
const assetPagination = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getWith(pagination, WithProperty.IS_OFFLINE, job.id, true),
);
let offlineAssets = 0;
for await (const assets of assetPagination) {
offlineAssets += assets.length;
if (assets.length > 0) {
this.logger.debug(`Discovered ${offlineAssets} offline assets in library ${job.id}`);
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.ASSET_DELETION,
data: {
id: asset.id,
deleteOnDisk: false,
},
})),
);
this.logger.verbose(`Queued deletion of ${assets.length} offline assets in library ${job.id}`);
}
}
if (offlineAssets) {
this.logger.debug(`Finished queueing deletion of ${offlineAssets} offline assets for library ${job.id}`);
} else {
this.logger.debug(`Found no offline assets to delete from library ${job.id}`);
}
return JobStatus.SUCCESS;
}
async handleQueueAssetRefresh(job: ILibraryRefreshJob): Promise<JobStatus> {
async handleQueueSyncFiles(job: IEntityJob): Promise<JobStatus> {
const library = await this.repository.get(job.id);
if (!library) {
this.logger.debug(`Library ${job.id} not found, skipping refresh`);
return JobStatus.SKIPPED;
}
this.logger.log(`Refreshing library ${library.id}`);
this.logger.log(`Refreshing library ${library.id} for new assets`);
const validImportPaths: string[] = [];
@ -630,55 +560,66 @@ export class LibraryService {
}
}
if (validImportPaths.length === 0) {
if (validImportPaths) {
const assetsOnDisk = this.storageRepository.walk({
pathsToCrawl: validImportPaths,
includeHidden: false,
exclusionPatterns: library.exclusionPatterns,
take: JOBS_LIBRARY_PAGINATION_SIZE,
});
let count = 0;
for await (const assetBatch of assetsOnDisk) {
count += assetBatch.length;
this.logger.debug(`Discovered ${count} asset(s) on disk for library ${library.id}...`);
await this.syncFiles(library, assetBatch);
this.logger.verbose(`Queued scan of ${assetBatch.length} crawled asset(s) in library ${library.id}...`);
}
if (count > 0) {
this.logger.debug(`Finished queueing scan of ${count} assets on disk for library ${library.id}`);
} else {
this.logger.debug(`No non-excluded assets found in any import path for library ${library.id}`);
}
} else {
this.logger.warn(`No valid import paths found for library ${library.id}`);
}
const assetsOnDisk = this.storageRepository.walk({
pathsToCrawl: validImportPaths,
includeHidden: false,
exclusionPatterns: library.exclusionPatterns,
take: JOBS_LIBRARY_PAGINATION_SIZE,
});
await this.repository.update({ id: job.id, refreshedAt: new Date() });
let crawledAssets = 0;
return JobStatus.SUCCESS;
}
for await (const assetBatch of assetsOnDisk) {
crawledAssets += assetBatch.length;
this.logger.debug(`Discovered ${crawledAssets} asset(s) on disk for library ${library.id}...`);
await this.scanAssets(job.id, assetBatch, library.ownerId, job.refreshAllFiles ?? false);
this.logger.verbose(`Queued scan of ${assetBatch.length} crawled asset(s) in library ${library.id}...`);
async handleQueueSyncAssets(job: IEntityJob): Promise<JobStatus> {
const library = await this.repository.get(job.id);
if (!library) {
return JobStatus.SKIPPED;
}
if (crawledAssets) {
this.logger.debug(`Finished queueing scan of ${crawledAssets} assets on disk for library ${library.id}`);
} else {
this.logger.debug(`No non-excluded assets found in any import path for library ${library.id}`);
}
this.logger.log(`Scanning library ${library.id} for removed assets`);
const onlineAssets = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getWith(pagination, WithProperty.IS_ONLINE, job.id),
this.assetRepository.getAll(pagination, { libraryId: job.id }),
);
let onlineAssetCount = 0;
let assetCount = 0;
for await (const assets of onlineAssets) {
onlineAssetCount += assets.length;
this.logger.debug(`Discovered ${onlineAssetCount} asset(s) in library ${library.id}...`);
assetCount += assets.length;
this.logger.debug(`Discovered ${assetCount} asset(s) in library ${library.id}...`);
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.LIBRARY_CHECK_OFFLINE,
data: { id: asset.id, importPaths: validImportPaths, exclusionPatterns: library.exclusionPatterns },
name: JobName.LIBRARY_SYNC_ASSET,
data: { id: asset.id, importPaths: library.importPaths, exclusionPatterns: library.exclusionPatterns },
})),
);
this.logger.debug(`Queued online check of ${assets.length} asset(s) in library ${library.id}...`);
this.logger.debug(`Queued check of ${assets.length} asset(s) in library ${library.id}...`);
}
if (onlineAssetCount) {
this.logger.log(`Finished queueing online check of ${onlineAssetCount} assets for library ${library.id}`);
if (assetCount) {
this.logger.log(`Finished queueing check of ${assetCount} assets for library ${library.id}`);
}
await this.repository.update({ id: job.id, refreshedAt: new Date() });
return JobStatus.SUCCESS;
}