diff --git a/cli/src/commands/asset.ts b/cli/src/commands/asset.ts index 2af3cf8d5e..a6c5bd075a 100644 --- a/cli/src/commands/asset.ts +++ b/cli/src/commands/asset.ts @@ -9,12 +9,13 @@ import { createAlbum, defaults, getAllAlbums, + getAssetsByOriginalPath, getSupportedMediaTypes, } from '@immich/sdk'; import byteSize from 'byte-size'; import { Matcher, watch as watchFs } from 'chokidar'; import { MultiBar, Presets, SingleBar } from 'cli-progress'; -import { chunk } from 'lodash-es'; +import { chunk, groupBy } from 'lodash-es'; import micromatch from 'micromatch'; import { Stats, createReadStream } from 'node:fs'; import { stat, unlink } from 'node:fs/promises'; @@ -44,6 +45,8 @@ export interface UploadOptionsDto { progress?: boolean; watch?: boolean; jsonOutput?: boolean; + replaceAssets?: boolean; + replacePath?: string; } class UploadFile extends File { @@ -66,6 +69,9 @@ class UploadFile extends File { const uploadBatch = async (files: string[], options: UploadOptionsDto) => { const { newFiles, duplicates } = await checkForDuplicates(files, options); const newAssets = await uploadFiles(newFiles, options); + if (options.replaceAssets) { + await replaceFiles(duplicates, options); + } if (options.jsonOutput) { console.log(JSON.stringify({ newFiles, duplicates, newAssets }, undefined, 4)); } @@ -174,7 +180,7 @@ const scan = async (pathsToCrawl: string[], options: UploadOptionsDto) => { return files; }; -export const checkForDuplicates = async (files: string[], { concurrency, skipHash, progress }: UploadOptionsDto) => { +export const checkForDuplicates = async (files: string[], { concurrency, skipHash, progress, replacePath }: UploadOptionsDto) => { if (skipHash) { console.log('Skipping hash check, assuming all files are new'); return { newFiles: files, duplicates: [] }; @@ -191,8 +197,8 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas console.log(`Received ${files.length} files, hashing...`); } - const hashProgressBar = multiBar?.create(files.length, 0, { message: 'Hashing files ' }); - const checkProgressBar = multiBar?.create(files.length, 0, { message: 'Checking for duplicates' }); + const hashProgressBar = multiBar?.create(files.length, 0, { message: 'Hashing files ' }); + const checkProgressBar = multiBar?.create(files.length, 0, { message: 'Checking for duplicates (by content hash) ' }); const newFiles: string[] = []; const duplicates: Asset[] = []; @@ -250,9 +256,52 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas await checkBulkUploadQueue.drained(); + const [localPathPrefix, serverPathPrefix] = replacePath?.split(':') || []; + // const localPathPrefix = '/import'; + // const serverPathPrefix = '/data/import/pictures'; + const realNewFiles: string[] = []; + if (localPathPrefix && serverPathPrefix) { + const newFilePaths = groupBy(newFiles, (p) => path.dirname(p)); + + const originalPathsProgressBar = multiBar?.create(Object.keys(newFilePaths).length, 0, { message: 'Checking for duplicates (by original paths)' }); + + const checkOriginalPathQueue = new Queue( + async (localDir: string) => { + const assets = Object.fromEntries( + (await getAssetsByOriginalPath({ path: serverPathPrefix + localDir.slice(localPathPrefix.length) })) + .map((asset) => [localPathPrefix + asset.originalPath.slice(serverPathPrefix.length), asset]) + ); + + for (const filepath of newFilePaths[localDir]) { + const asset = assets[filepath]; + if (asset) { + duplicates.push({ id: asset.id, filepath }); + } else { + realNewFiles.push(filepath); + } + } + + originalPathsProgressBar?.increment(); + }, + { concurrency, retry: 3 }, + ); + + for (const [dirpath, files] of Object.entries(newFilePaths)) { + if (dirpath.startsWith(localPathPrefix)) { + void checkOriginalPathQueue.push(dirpath); + } else { + realNewFiles.push(...files); + } + } + + await checkOriginalPathQueue.drained(); + } else { + realNewFiles.push(...newFiles); + } + multiBar?.stop(); - console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`); + console.log(`Found ${realNewFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`); // Report failures const failedTasks = queue.tasks.filter((task) => task.status === 'failed'); @@ -263,7 +312,110 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas } } - return { newFiles, duplicates }; + return { newFiles: realNewFiles, duplicates }; +}; + +export const replaceFiles = async ( + assets: Asset[], + { dryRun, concurrency, progress }: UploadOptionsDto, +): Promise => { + if (assets.length === 0) { + return; + } + + // Compute total size first + let totalSize = 0; + const statsMap = new Map(); + for (const { filepath } of assets) { + const stats = await stat(filepath); + statsMap.set(filepath, stats); + totalSize += stats.size; + } + + if (dryRun) { + console.log(`Would have uploaded ${assets.length} replacement${s(assets.length)} (${byteSize(totalSize)})`); + return; + } + + let uploadProgress: SingleBar | undefined; + + if (progress) { + uploadProgress = new SingleBar( + { + format: 'Uploading replacements | {bar} | {percentage}% | ETA: {eta_formatted} | {value_formatted}/{total_formatted}', + }, + Presets.shades_classic, + ); + } else { + console.log(`Uploading ${assets.length} replacement${s(assets.length)} (${byteSize(totalSize)})`); + } + uploadProgress?.start(totalSize, 0); + uploadProgress?.update({ value_formatted: 0, total_formatted: byteSize(totalSize) }); + + let successCount = 0; + let successSize = 0; + + const queue = new Queue( + async (asset: Asset) => { + const stats = statsMap.get(asset.filepath); + if (!stats) { + throw new Error(`Stats not found for ${asset.filepath}`); + } + + const response = await replaceFile(asset, stats); + successCount++; + successSize += stats.size ?? 0; + + uploadProgress?.update(successSize, { value_formatted: byteSize(successSize) }); + + return response; + }, + { concurrency, retry: 3 }, + ); + + for (const item of assets) { + void queue.push(item); + } + + await queue.drained(); + + uploadProgress?.stop(); + + console.log(`Successfully uploaded ${successCount} replacement${s(successCount)} (${byteSize(successSize)})`); + + // Report failures + const failedTasks = queue.tasks.filter((task) => task.status === 'failed'); + if (failedTasks.length > 0) { + console.log(`Failed to upload ${failedTasks.length} replacement${s(failedTasks.length)}:`); + for (const task of failedTasks) { + console.log(`- ${task.data} - ${task.error}`); + } + } +}; + +const replaceFile = async (input: Asset, stats: Stats): Promise => { + const { baseUrl, headers } = defaults; + + const formData = new FormData(); + formData.append('deviceAssetId', `${basename(input.filepath)}-${stats.size}`.replaceAll(/\s+/g, '')); + formData.append('deviceId', 'CLI'); + formData.append('fileCreatedAt', stats.mtime.toISOString()); + formData.append('fileModifiedAt', stats.mtime.toISOString()); + formData.append('fileSize', String(stats.size)); + formData.append('assetData', new UploadFile(input.filepath, stats.size)); + formData.append('filename', basename(input.filepath)); + + const response = await fetch(`${baseUrl}/assets/${input.id}/original`, { + method: 'put', + redirect: 'error', + headers: headers as Record, + body: formData, + }); + if (response.status !== 200) { + throw new Error(await response.text()); + } + + return response.json(); }; export const uploadFiles = async ( diff --git a/cli/src/index.ts b/cli/src/index.ts index a0392186c0..2fd6fac6a5 100644 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -47,6 +47,8 @@ program .addOption(new Option('-i, --ignore ', 'Pattern to ignore').env('IMMICH_IGNORE_PATHS')) .addOption(new Option('-h, --skip-hash', "Don't hash files before upload").env('IMMICH_SKIP_HASH').default(false)) .addOption(new Option('-H, --include-hidden', 'Include hidden folders').env('IMMICH_INCLUDE_HIDDEN').default(false)) + .addOption(new Option('-R, --replace-assets', 'Replace with upload').env('IMMICH_REPLACE_ASSETS').default(false)) + .addOption(new Option('-P, --replace-path ', 'Replace local path prefix').env('IMMICH_REPLACE_PATH')) .addOption( new Option('-a, --album', 'Automatically create albums based on folder name') .env('IMMICH_AUTO_CREATE_ALBUM')